* Work in progress to redo live view implementation to be less invasive

This commit is contained in:
Vitaliy Zakaznikov 2019-05-26 18:03:30 -04:00
parent aabee36dac
commit 9dd07bcc23
11 changed files with 1406 additions and 0 deletions

View File

@ -0,0 +1,51 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A stream of blocks from a shared vector of blocks
*/
class BlocksBlockInputStream : public IBlockInputStream
{
public:
/// Acquires shared ownership of the blocks vector
BlocksBlockInputStream(std::shared_ptr<BlocksPtr> blocks_ptr_, Block header)
: blocks_ptr(blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()), header(header) {}
String getName() const override { return "Blocks"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override
{
if (it == end)
return Block();
Block res = *it;
++it;
return res;
}
private:
std::shared_ptr<BlocksPtr> blocks_ptr;
Blocks::iterator it;
const Blocks::iterator end;
Block header;
};
}

View File

@ -0,0 +1,209 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
namespace DB
{
/**
*/
class LiveViewBlockInputStream : public IBlockInputStream
{
using NonBlockingResult = std::pair<Block, bool>;
public:
~LiveViewBlockInputStream() override
{
/// Start storage no users thread
/// if we are the last active user
if (!storage.is_dropped && blocks_ptr.use_count() < 3)
storage.startNoUsersThread();
}
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
/// and LIMIT 0 just returns data without waiting for any updates
LiveViewBlockInputStream(StorageLiveView & storage_, std::shared_ptr<BlocksPtr> blocks_ptr_, std::shared_ptr<bool> active_ptr_, Poco::Condition & condition_, Poco::FastMutex & mutex_,
int64_t length_, const UInt64 & heartbeat_delay_)
: storage(storage_), blocks_ptr(blocks_ptr_), active_ptr(active_ptr_), condition(condition_), mutex(mutex_), length(length_ + 1), heartbeat_delay(heartbeat_delay_), blocks_hash("")
{
/// grab active pointer
active = active_ptr.lock();
}
String getName() const override { return "LiveViewBlockInputStream"; }
void cancel(bool kill) override
{
if (isCancelled() || storage.is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(mutex);
condition.broadcast();
}
Block getHeader() const override { return storage.getHeader(); }
void refresh()
{
if (active && blocks && it == end)
it = blocks->begin();
}
void suspend()
{
active.reset();
}
void resume()
{
active = active_ptr.lock();
{
if (!blocks || blocks.get() != (*blocks_ptr).get())
blocks = (*blocks_ptr);
}
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
NonBlockingResult tryRead()
{
return tryRead_(false);
}
protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
}
/** tryRead method attempts to read a block in either blocking
* or non-blocking mode. If blocking is set to false
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
{
Block res;
if (length == 0)
{
return { Block(), true };
}
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
if (isCancelled() || storage.is_dropped)
{
return { Block(), true };
}
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
/// and there are new blocks availble then get them
if (blocks.get() != (*blocks_ptr).get())
{
blocks = (*blocks_ptr);
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
/// No new blocks available wait for new ones
else
{
if (!blocking)
{
return { Block(), false };
}
while (true)
{
bool signaled = condition.tryWait(mutex, std::max((UInt64)0, heartbeat_delay - ((UInt64)timestamp.epochMicroseconds() - last_event_timestamp)) / 1000);
if (isCancelled() || storage.is_dropped)
{
return { Block(), true };
}
if (signaled)
{
break;
}
else
{
//hashmap["blocks"] = blocks_hash;
last_event_timestamp = (UInt64)timestamp.epochMicroseconds();
//heartbeat(Heartbeat(last_event_timestamp, std::move(hashmap)));
}
}
}
}
return tryRead_(blocking);
}
res = *it;
++it;
if (it == end)
{
if (length > 0)
--length;
}
last_event_timestamp = (UInt64)timestamp.epochMicroseconds();
return { res, true };
}
private:
StorageLiveView & storage;
std::shared_ptr<BlocksPtr> blocks_ptr;
std::weak_ptr<bool> active_ptr;
std::shared_ptr<bool> active;
BlocksPtr blocks;
Blocks::iterator it;
Blocks::iterator end;
Blocks::iterator begin;
Poco::Condition & condition;
Poco::FastMutex & mutex;
/// Length specifies number of updates to send, default -1 (no limit)
int64_t length;
UInt64 heartbeat_delay;
String blocks_hash;
UInt64 last_event_timestamp{0};
Poco::Timestamp timestamp;
};
}

View File

@ -0,0 +1,102 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <Core/Settings.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTWatchQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_STORAGE;
extern const int TOO_MANY_COLUMNS;
}
BlockInputStreamPtr InterpreterWatchQuery::executeImpl()
{
return std::make_shared<OneBlockInputStream>(Block());
}
BlockIO InterpreterWatchQuery::execute()
{
BlockIO res;
const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr);
String database;
String table;
/// Get database
if (!query.database.empty())
database = query.database;
else
database = context.getCurrentDatabase();
/// Get table
table = query.table;
/// Get storage
storage = context.tryGetTable(database, table);
/// List of columns to read to execute the query.
Names required_columns = storage->getColumns().getNamesOfPhysical();
/// Get context settings for this query
const Settings & settings = context.getSettingsRef();
/// Limitation on the number of columns to read.
if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + std::to_string(required_columns.size())
+ ", maximum: " + settings.max_columns_to_read.toString(),
ErrorCodes::TOO_MANY_COLUMNS);
size_t max_block_size = settings.max_block_size;
size_t max_streams = 1;
/// Define query info
SelectQueryInfo query_info;
query_info.query = query_ptr;
/// From stage
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
QueryProcessingStage::Enum to_stage = QueryProcessingStage::Complete;
/// Watch storage
streams = storage->watch(required_columns, query_info, context, from_stage, max_block_size, max_streams);
/// Constraints on the result, the quota on the result, and also callback for progress.
if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get()))
{
/// Constraints apply only to the final result.
if (to_stage == QueryProcessingStage::Complete)
{
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits.max_rows = settings.max_result_rows;
limits.size_limits.max_bytes = settings.max_result_bytes;
limits.size_limits.overflow_mode = settings.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
res.in = streams[0];
return res;
}
}

View File

@ -0,0 +1,50 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/IInterpreter.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
namespace DB
{
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
using StoragePtr = std::shared_ptr<IStorage>;
class InterpreterWatchQuery : public IInterpreter
{
public:
InterpreterWatchQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
Context & context;
BlockInputStreamPtr executeImpl();
/// Table from where to read data, if not subquery.
StoragePtr storage;
/// Streams of read data
BlockInputStreams streams;
};
}

View File

@ -0,0 +1,59 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Parsers/ASTQueryWithTableAndOutput.h>
namespace DB
{
class ASTWatchQuery : public ASTQueryWithTableAndOutput
{
public:
ASTPtr limit_length;
bool is_watch_events;
ASTWatchQuery() = default;
String getID(char) const override { return "WatchQuery_" + database + "_" + table; };
ASTPtr clone() const override
{
std::shared_ptr<ASTWatchQuery> res = std::make_shared<ASTWatchQuery>(*this);
res->children.clear();
cloneOutputOptions(*res);
return res;
}
protected:
void formatQueryImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override
{
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
s.ostr << (s.hilite ? hilite_keyword : "") << "WATCH" << " " << (s.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
if (is_watch_events)
{
s.ostr << " " << (s.hilite ? hilite_keyword : "") << "EVENTS" << (s.hilite ? hilite_none : "");
}
if (limit_length)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : "");
limit_length->formatImpl(s, state, frame);
}
}
};
}

View File

@ -0,0 +1,77 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ParserWatchQuery.h>
#include <Parsers/ExpressionElementParsers.h>
namespace DB
{
bool ParserWatchQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_watch("WATCH");
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
ParserKeyword s_events("EVENTS");
ParserKeyword s_limit("LIMIT");
ASTPtr database;
ASTPtr table;
auto query = std::make_shared<ASTWatchQuery>();
if (!s_watch.ignore(pos, expected))
{
return false;
}
if (!name_p.parse(pos, table, expected))
return false;
if (s_dot.ignore(pos, expected))
{
database = table;
if (!name_p.parse(pos, table, expected))
return false;
}
/// EVENTS
if (s_events.ignore(pos, expected))
{
query->is_watch_events = true;
}
/// LIMIT length
if (s_limit.ignore(pos, expected))
{
ParserNumber num;
if (!num.parse(pos, query->limit_length, expected))
return false;
}
if (database)
getIdentifierName(database, query->database);
if (table)
getIdentifierName(table, query->table);
node = query;
return true;
}
}

View File

@ -0,0 +1,30 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Query like this:
* WATCH [db.]table EVENTS
*/
class ParserWatchQuery : public IParserBase
{
protected:
const char * getName() const { return "WATCH query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
}

View File

@ -0,0 +1,65 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Parsers/ASTAlterQuery.h>
#include <optional>
#include <Storages/StorageLiveView.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_STORAGE;
}
struct LiveViewCommand
{
enum Type
{
REFRESH
};
Type type;
ASTPtr values;
static LiveViewCommand refresh(const ASTPtr & values)
{
LiveViewCommand res;
res.type = REFRESH;
res.values = values;
return res;
}
static std::optional<LiveViewCommand> parse(ASTAlterCommand * command)
{
if (command->type == ASTAlterCommand::LIVE_VIEW_REFRESH)
return refresh(command->values);
return {};
}
};
class LiveViewCommands : public std::vector<LiveViewCommand>
{
public:
void validate(const IStorage & table)
{
if (!empty() && !dynamic_cast<const StorageLiveView *>(&table))
throw Exception("Wrong storage type. Must be StorageLiveView", DB::ErrorCodes::UNKNOWN_STORAGE);
}
};
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Storages/IStorage.h>
namespace DB
{
class ProxyStorage : public IStorage
{
public:
ProxyStorage(StoragePtr storage, BlockInputStreams streams, QueryProcessingStage::Enum to_stage)
: storage(std::move(storage)), streams(std::move(streams)), to_stage(to_stage) {}
public:
std::string getName() const override { return "ProxyStorage(" + storage->getName() + ")"; }
std::string getTableName() const override { return storage->getTableName(); }
bool isRemote() const override { return storage->isRemote(); }
bool supportsSampling() const override { return storage->supportsSampling(); }
bool supportsFinal() const override { return storage->supportsFinal(); }
bool supportsPrewhere() const override { return storage->supportsPrewhere(); }
bool supportsReplication() const override { return storage->supportsReplication(); }
bool supportsDeduplication() const override { return storage->supportsDeduplication(); }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
BlockInputStreams read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
{
return streams;
}
bool supportsIndexForIn() const override { return storage->supportsIndexForIn(); }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override { return storage->mayBenefitFromIndexForIn(left_in_operand, query_context); }
ASTPtr getPartitionKeyAST() const override { return storage->getPartitionKeyAST(); }
ASTPtr getSortingKeyAST() const override { return storage->getSortingKeyAST(); }
ASTPtr getPrimaryKeyAST() const override { return storage->getPrimaryKeyAST(); }
ASTPtr getSamplingKeyAST() const override { return storage->getSamplingKeyAST(); }
Names getColumnsRequiredForPartitionKey() const override { return storage->getColumnsRequiredForPartitionKey(); }
Names getColumnsRequiredForSortingKey() const override { return storage->getColumnsRequiredForSortingKey(); }
Names getColumnsRequiredForPrimaryKey() const override { return storage->getColumnsRequiredForPrimaryKey(); }
Names getColumnsRequiredForSampling() const override { return storage->getColumnsRequiredForSampling(); }
Names getColumnsRequiredForFinal() const override { return storage->getColumnsRequiredForFinal(); }
NameAndTypePair getColumn(const String & column_name) const override { return storage->getColumn(column_name); }
bool hasColumn(const String & column_name) const override { return storage->hasColumn(column_name); }
static StoragePtr createProxyStorage(StoragePtr storage, BlockInputStreams streams, QueryProcessingStage::Enum to_stage)
{
return std::make_shared<ProxyStorage>(std::move(storage), std::move(streams), to_stage);
}
private:
StoragePtr storage;
BlockInputStreams streams;
QueryProcessingStage::Enum to_stage;
};
}

View File

@ -0,0 +1,405 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/LiveViewBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageLiveView.h>
#include <Storages/StorageFactory.h>
#include <Storages/ProxyStorage.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
}
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
{
auto db_and_table = getDatabaseAndTable(query, 0);
ASTPtr subquery = extractTableExpression(query, 0);
if (!db_and_table && !subquery)
return;
if (db_and_table)
{
select_table_name = db_and_table->table;
if (db_and_table->database.empty())
{
db_and_table->database = select_database_name;
AddDefaultDatabaseVisitor visitor(select_database_name);
visitor.visit(query);
}
else
select_database_name = db_and_table->database;
}
else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
{
if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
auto & inner_query = ast_select->list_of_selects->children.at(0);
extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name);
}
else
throw Exception("Logical error while creating StorageLiveView."
" Could not retrieve table name from select query.",
DB::ErrorCodes::LOGICAL_ERROR);
}
static void checkAllowedQueries(const ASTSelectQuery & query)
{
if (query.prewhere() || query.final() || query.sample_size())
throw Exception("LIVE VIEW cannot have PREWHERE, SAMPLE or FINAL.", DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
ASTPtr subquery = extractTableExpression(query, 0);
if (!subquery)
return;
if (const auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
{
if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
const auto & inner_query = ast_select->list_of_selects->children.at(0);
checkAllowedQueries(inner_query->as<ASTSelectQuery &>());
}
}
StorageLiveView::StorageLiveView(
const String & table_name_,
const String & database_name_,
Context & local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns)
: IStorage(columns), table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext())
{
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
/// Default value, if only table name exist in the query
select_database_name = local_context.getCurrentDatabase();
if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
inner_query = query.select->list_of_selects->children.at(0);
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query);
extractDependentTable(select_query, select_database_name, select_table_name);
/// If the table is not specified - use the table `system.one`
if (select_table_name.empty())
{
select_database_name = "system";
select_table_name = "one";
}
global_context.addDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
is_temporary = query.temporary;
blocks_ptr = std::make_shared<BlocksPtr>();
active_ptr = std::make_shared<bool>(true);
}
Block StorageLiveView::getHeader() const
{
if (!sample_block)
{
auto storage = global_context.getTable(select_database_name, select_table_name);
sample_block = InterpreterSelectQuery(inner_query, global_context, storage).getSampleBlock();
}
return sample_block;
}
bool StorageLiveView::getNewBlocks()
{
Block block;
SipHash hash;
UInt128 key;
BlocksPtr new_blocks = std::make_shared<Blocks>();
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(inner_query->clone(), global_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
while (Block block = mergeable_stream->read())
new_mergeable_blocks->push_back(block);
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
mergeable_blocks->push_back(new_mergeable_blocks);
BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(inner_query->clone(), global_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
while (Block block = data->read())
{
block.updateHash(hash);
new_blocks->push_back(block);
}
hash.get128(key.low, key.high);
/// Update blocks only if hash keys do not match
/// NOTE: hash could be different for the same result
/// if blocks are not in the same order
bool updated = false;
{
if (hash_key != key.toHexString())
{
if (new_blocks->empty())
{
new_blocks->push_back(getHeader());
}
(*blocks_ptr) = new_blocks;
hash_key = key.toHexString();
updated = true;
}
}
return updated;
}
void StorageLiveView::checkTableCanBeDropped() const
{
Dependencies dependencies = global_context.getDependencies(database_name, table_name);
if (!dependencies.empty())
{
DatabaseAndTableName database_and_table_name = dependencies.front();
throw Exception("Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second, ErrorCodes::TABLE_WAS_NOT_DROPPED);
}
}
void StorageLiveView::noUsersThread()
{
if (shutdown_called)
return;
bool drop_table = false;
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
while (1)
{
if (!noUsersThreadWakeUp && !noUsersThreadCondition.tryWait(noUsersThreadMutex, global_context.getSettingsRef().temporary_live_view_timeout.totalSeconds() * 1000))
{
noUsersThreadWakeUp = false;
if (shutdown_called)
return;
if (hasUsers())
return;
if (!global_context.getDependencies(database_name, table_name).empty())
continue;
drop_table = true;
}
break;
}
}
if (drop_table)
{
if (global_context.tryGetTable(database_name, table_name))
{
try
{
/// We create and execute `drop` query for this table
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = database_name;
drop_query->table = table_name;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, global_context);
drop_interpreter.execute();
}
catch (...)
{
}
}
}
}
void StorageLiveView::startNoUsersThread()
{
bool expected = false;
if (!startnousersthread_called.compare_exchange_strong(expected, true))
return;
if (is_dropped)
return;
if (is_temporary)
{
if (no_users_thread.joinable())
{
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
}
no_users_thread.join();
}
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = false;
}
if (!is_dropped)
no_users_thread = std::thread(&StorageLiveView::noUsersThread, this);
}
startnousersthread_called = false;
}
void StorageLiveView::startup()
{
startNoUsersThread();
}
void StorageLiveView::shutdown()
{
bool expected = false;
if (!shutdown_called.compare_exchange_strong(expected, true))
return;
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
/// Must detach the no users thread
/// as we can't join it as it will result
/// in a deadlock
no_users_thread.detach();
}
}
StorageLiveView::~StorageLiveView()
{
shutdown();
}
void StorageLiveView::drop()
{
global_context.removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
Poco::FastMutex::ScopedLock lock(mutex);
is_dropped = true;
condition.broadcast();
}
void StorageLiveView::refresh(const Context & context)
{
auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
{
Poco::FastMutex::ScopedLock lock(mutex);
if (getNewBlocks())
condition.broadcast();
}
}
BlockInputStreams StorageLiveView::read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
/// add user to the blocks_ptr
std::shared_ptr<BlocksPtr> stream_blocks_ptr = blocks_ptr;
{
Poco::FastMutex::ScopedLock lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
}
}
return { std::make_shared<BlocksBlockInputStream>(stream_blocks_ptr, getHeader()) };
}
BlockInputStreams StorageLiveView::watch(
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
/// By default infinite stream of updates
int64_t length = -2;
if (query.limit_length)
length = (int64_t)safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_length).value);
auto reader = std::make_shared<LiveViewBlockInputStream>(*this, blocks_ptr, active_ptr, condition, mutex, length, context.getSettingsRef().heartbeat_delay);
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
}
{
Poco::FastMutex::ScopedLock lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
}
}
processed_stage = QueryProcessingStage::Complete;
return { reader };
}
BlockOutputStreamPtr StorageLiveView::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<LiveViewBlockOutputStream>(*this);
}
void registerStorageLiveView(StorageFactory & factory)
{
factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)
{
return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns);
});
}
}

View File

@ -0,0 +1,294 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Poco/Condition.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/BlocksBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <ext/shared_ptr_helper.h>
#include <Common/SipHash.h>
#include <Storages/IStorage.h>
#include <Storages/ProxyStorage.h>
namespace DB
{
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageLiveView>;
friend class LiveViewBlockOutputStream;
public:
~StorageLiveView() override;
String getName() const override { return "LiveView"; }
String getTableName() const override { return table_name; }
String getDatabaseName() const { return database_name; }
String getSelectDatabaseName() const { return select_database_name; }
String getSelectTableName() const { return select_table_name; }
// const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
ASTPtr getInnerQuery() const { return inner_query->clone(); };
/// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
/// Mutex for the blocks and ready condition
Poco::FastMutex mutex;
/// New blocks ready condition to broadcast to readers
/// that new blocks are available
Poco::Condition condition;
bool isTemporary() { return is_temporary; }
/// Check if we have any readers
/// must be called with mutex locked
bool hasUsers()
{
return blocks_ptr.use_count() > 1;
}
/// Check we we have any active readers
/// must be called with mutex locked
bool hasActiveUsers()
{
return active_ptr.use_count() > 1;
}
/// Background thread for temporary tables
/// which drops this table if there are no users
void startNoUsersThread();
Poco::FastMutex noUsersThreadMutex;
bool noUsersThreadWakeUp{false};
Poco::Condition noUsersThreadCondition;
String getBlocksHashKey()
{
return hash_key;
}
/// Reset blocks
/// must be called with mutex locked
void reset()
{
(*blocks_ptr).reset();
mergeable_blocks.reset();
hash_key = "";
}
void checkTableCanBeDropped() const override;
void drop() override;
void startup() override;
void shutdown() override;
void refresh(const Context & context);
BlockOutputStreamPtr write(
const ASTPtr &,
const Context &) override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockInputStreams watch(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
/// Read new data blocks that store query result
bool getNewBlocks();
Block getHeader() const;
static void writeIntoLiveView(StorageLiveView & live_view,
const Block & block,
const Context & context,
BlockOutputStreamPtr & output)
{
/// Check if live view has any readers if not
/// just reset blocks to empty and do nothing else
/// When first reader comes the blocks will be read.
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
if (!live_view.hasActiveUsers())
{
live_view.reset();
return;
}
}
SipHash hash;
UInt128 key;
BlockInputStreams from;
BlocksPtr blocks = std::make_shared<Blocks>();
BlocksPtrs mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
{
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(live_view.getInnerQuery(), context, proxy_storage,
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(select_block.execute().in);
while (Block this_block = data_mergeable_stream->read())
new_mergeable_blocks->push_back(this_block);
}
if (new_mergeable_blocks->empty())
return;
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
if (!mergeable_blocks || mergeable_blocks->size() >= 64)
{
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names{});
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read())
base_mergeable_blocks->push_back(this_block);
mergeable_blocks->push_back(base_mergeable_blocks);
}
/// Need make new mergeable block structure match the other mergeable blocks
if (!mergeable_blocks->front()->empty() && !new_mergeable_blocks->empty())
{
auto sample_block = mergeable_blocks->front()->front();
auto sample_new_block = new_mergeable_blocks->front();
for (auto col : sample_new_block)
{
for (auto & new_block : *new_mergeable_blocks)
{
if (!sample_block.has(col.name))
new_block.erase(col.name);
}
}
}
mergeable_blocks->push_back(new_mergeable_blocks);
/// Create from blocks streams
for (auto & blocks : *mergeable_blocks)
{
auto sample_block = mergeable_blocks->front()->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks), sample_block);
from.push_back(std::move(stream));
}
}
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
while (Block this_block = data->read())
{
this_block.updateHash(hash);
blocks->push_back(this_block);
}
/// get hash key
hash.get128(key.low, key.high);
/// Update blocks only if hash keys do not match
/// NOTE: hash could be different for the same result
/// if blocks are not in the same order
if (live_view.getBlocksHashKey() != key.toHexString())
{
auto sample_block = blocks->front().cloneEmpty();
BlockInputStreamPtr new_data = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks), sample_block);
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
copyData(*new_data, *output);
}
}
}
private:
String select_database_name;
String select_table_name;
String table_name;
String database_name;
ASTPtr inner_query;
Context & global_context;
bool is_temporary {false};
mutable Block sample_block;
/// Active users
std::shared_ptr<bool> active_ptr;
/// Current data blocks that store query result
std::shared_ptr<BlocksPtr> blocks_ptr;
BlocksPtr new_blocks;
BlocksPtrs mergeable_blocks;
/// Current blocks hash key
String hash_key;
String new_hash_key;
void noUsersThread();
std::thread no_users_thread;
std::atomic<bool> shutdown_called{false};
std::atomic<bool> startnousersthread_called{false};
StorageLiveView(
const String & table_name_,
const String & database_name_,
Context & local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns
);
};
class LiveViewBlockOutputStream : public IBlockOutputStream
{
public:
explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
void write(const Block & block) override
{
if (!new_blocks)
new_blocks = std::make_shared<Blocks>();
new_blocks->push_back(block);
// FIXME: do I need to calculate block hash?
(*storage.blocks_ptr) = new_blocks;
new_blocks.reset();
storage.condition.broadcast();
}
Block getHeader() const override { return storage.getHeader(); }
private:
BlocksPtr new_blocks;
String new_hash_key;
StorageLiveView & storage;
};
}