multi 'WATCH' and 'TO' support

This commit is contained in:
Vxider 2020-02-13 02:08:52 +08:00
parent 4658732e16
commit 4351a16103
4 changed files with 14 additions and 19 deletions

View File

@ -16,8 +16,8 @@ class BlocksListInputStream : public IBlockInputStream
{
public:
/// Acquires shared ownership of the blocks vector
BlocksListInputStream(BlocksListPtrs blocks_ptr_, Block header_, std::mutex &mutex_, UInt32 window_upper_bound_)
: blocks(blocks_ptr_), mutex(mutex_), window_upper_bound(window_upper_bound_), header(std::move(header_))
BlocksListInputStream(BlocksListPtrs blocks_ptr_, Block header_, UInt32 window_upper_bound_)
: blocks(blocks_ptr_), window_upper_bound(window_upper_bound_), header(std::move(header_))
{
it_blocks = blocks->begin();
end_blocks = blocks->end();
@ -46,7 +46,7 @@ protected:
IColumn::Filter filter(column_status->size(), 0);
auto & data = static_cast<const ColumnUInt32 &>(*column_status).getData();
{
std::unique_lock lock(mutex);
// std::unique_lock lock(mutex);
for (size_t i = 0; i < column_status->size(); ++i)
{
if (data[i] == window_upper_bound)
@ -121,7 +121,7 @@ private:
std::list<BlocksListPtr>::iterator end_blocks;
BlocksList::iterator it;
BlocksList::iterator end;
std::mutex & mutex;
// std::mutex & mutex;
UInt32 window_upper_bound;
Block header;
};

View File

@ -420,12 +420,6 @@ BlockInputStreams StorageWindowView::watch(
size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
if (!target_table_id.empty())
throw Exception("WATCH query is disabled for " + getName() + " when constructed with 'TO' clause.", ErrorCodes::INCORRECT_QUERY);
if (active_ptr.use_count() > 1)
throw Exception("WATCH query is already attached, WINDOW VIEW only supports attaching one watch query.", ErrorCodes::INCORRECT_QUERY);
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
bool has_limit = false;
@ -519,7 +513,7 @@ StorageWindowView::StorageWindowView(
}
else
{
if(query.storage->engine->name != "MergeTree")
if (query.storage->engine->name != "MergeTree")
throw Exception(
"The ENGINE of WindowView must be MergeTree family of table engines including the engines with replication support",
ErrorCodes::INCORRECT_QUERY);
@ -673,7 +667,7 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr()
BlockInputStreams from;
auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksListInputStream>(mergeable_blocks, sample_block_, mutex,w_upper_bound);
BlockInputStreamPtr stream = std::make_shared<BlocksListInputStream>(mergeable_blocks, sample_block_, w_upper_bound);
from.push_back(std::move(stream));
auto proxy_storage = std::make_shared<WindowViewProxyStorage>(
StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState);

View File

@ -64,11 +64,11 @@ public:
Block getHeader() const;
StoragePtr & getParentStorage()
StoragePtr& getParentStorage()
{
if (parent_storage == nullptr)
if (parent_storage == nullptr)
parent_storage = global_context.getTable(select_table_id);
return parent_storage;
return parent_storage;
}
StoragePtr& getInnerStorage()

View File

@ -58,7 +58,7 @@ protected:
/// If blocks were never assigned get blocks
if (!in_stream)
{
std::unique_lock lock(storage->mutex);
// std::unique_lock lock(storage->mutex);
in_stream = storage->getNewBlocksInputStreamPtr();
}
if (isCancelled() || storage->is_dropped)
@ -69,7 +69,7 @@ protected:
res = in_stream->read();
if (!res)
{
if (!active)
if (!(*active))
return Block();
if (!end_of_blocks)
@ -79,7 +79,7 @@ protected:
return getHeader();
}
std::unique_lock lock(storage->flushTableMutex);
std::unique_lock lock(mutex);
UInt64 timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
UInt64 w_end = static_cast<UInt64>(storage->getWindowUpperBound(static_cast<UInt32>(timestamp_usec / 1000000))) * 1000000;
storage->condition.wait_for(lock, std::chrono::microseconds(w_end - timestamp_usec));
@ -89,7 +89,7 @@ protected:
return Block();
}
{
std::unique_lock lock_(storage->mutex);
// std::unique_lock lock_(storage->mutex);
in_stream = storage->getNewBlocksInputStreamPtr();
}
@ -113,6 +113,7 @@ private:
std::shared_ptr<bool> active;
const bool has_limit;
const UInt64 limit;
std::mutex mutex;
Int64 num_updates = -1;
bool end_of_blocks = false;
BlockInputStreamPtr in_stream;