2019-05-26 22:03:30 +00:00
|
|
|
/* Copyright (c) 2018 BlackBerry Limited
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License. */
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <limits>
|
|
|
|
|
|
|
|
#include <Common/ConcurrentBoundedQueue.h>
|
|
|
|
#include <Poco/Condition.h>
|
|
|
|
#include <DataStreams/OneBlockInputStream.h>
|
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
|
|
|
#include <Storages/StorageLiveView.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
/**
|
|
|
|
*/
|
|
|
|
|
|
|
|
class LiveViewBlockInputStream : public IBlockInputStream
|
|
|
|
{
|
|
|
|
|
|
|
|
using NonBlockingResult = std::pair<Block, bool>;
|
|
|
|
|
|
|
|
public:
|
|
|
|
~LiveViewBlockInputStream() override
|
|
|
|
{
|
|
|
|
/// Start storage no users thread
|
|
|
|
/// if we are the last active user
|
2019-06-12 13:11:44 +00:00
|
|
|
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
|
|
|
storage->startNoUsersThread(temporary_live_view_timeout);
|
2019-05-26 22:03:30 +00:00
|
|
|
}
|
|
|
|
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
|
|
|
|
/// and LIMIT 0 just returns data without waiting for any updates
|
2019-06-12 13:11:44 +00:00
|
|
|
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
|
|
|
std::shared_ptr<BlocksPtr> blocks_ptr_,
|
2019-05-28 21:17:48 +00:00
|
|
|
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
2019-06-12 13:11:44 +00:00
|
|
|
std::shared_ptr<bool> active_ptr_,
|
2019-06-07 01:10:56 +00:00
|
|
|
int64_t length_, const UInt64 & heartbeat_interval_,
|
|
|
|
const UInt64 & temporary_live_view_timeout_)
|
2019-06-12 13:11:44 +00:00
|
|
|
: storage(storage_), blocks_ptr(blocks_ptr_), blocks_metadata_ptr(blocks_metadata_ptr_), active_ptr(active_ptr_), length(length_ + 1), heartbeat_interval(heartbeat_interval_ * 1000000), temporary_live_view_timeout(temporary_live_view_timeout_),
|
2019-06-07 01:10:56 +00:00
|
|
|
blocks_hash("")
|
2019-05-26 22:03:30 +00:00
|
|
|
{
|
|
|
|
/// grab active pointer
|
|
|
|
active = active_ptr.lock();
|
|
|
|
}
|
|
|
|
|
|
|
|
String getName() const override { return "LiveViewBlockInputStream"; }
|
|
|
|
|
|
|
|
void cancel(bool kill) override
|
|
|
|
{
|
2019-06-12 13:11:44 +00:00
|
|
|
if (isCancelled() || storage->is_dropped)
|
2019-05-26 22:03:30 +00:00
|
|
|
return;
|
|
|
|
IBlockInputStream::cancel(kill);
|
2019-06-12 13:11:44 +00:00
|
|
|
Poco::FastMutex::ScopedLock lock(storage->mutex);
|
|
|
|
storage->condition.broadcast();
|
2019-05-26 22:03:30 +00:00
|
|
|
}
|
|
|
|
|
2019-06-12 13:11:44 +00:00
|
|
|
Block getHeader() const override { return storage->getHeader(); }
|
2019-05-26 22:03:30 +00:00
|
|
|
|
|
|
|
void refresh()
|
|
|
|
{
|
|
|
|
if (active && blocks && it == end)
|
|
|
|
it = blocks->begin();
|
|
|
|
}
|
|
|
|
|
|
|
|
void suspend()
|
|
|
|
{
|
|
|
|
active.reset();
|
|
|
|
}
|
|
|
|
|
|
|
|
void resume()
|
|
|
|
{
|
|
|
|
active = active_ptr.lock();
|
|
|
|
{
|
|
|
|
if (!blocks || blocks.get() != (*blocks_ptr).get())
|
|
|
|
blocks = (*blocks_ptr);
|
|
|
|
}
|
|
|
|
it = blocks->begin();
|
|
|
|
begin = blocks->begin();
|
|
|
|
end = blocks->end();
|
|
|
|
}
|
|
|
|
|
|
|
|
NonBlockingResult tryRead()
|
|
|
|
{
|
|
|
|
return tryRead_(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
protected:
|
|
|
|
Block readImpl() override
|
|
|
|
{
|
|
|
|
/// try reading
|
|
|
|
return tryRead_(true).first;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** tryRead method attempts to read a block in either blocking
|
|
|
|
* or non-blocking mode. If blocking is set to false
|
|
|
|
* then method return empty block with flag set to false
|
|
|
|
* to indicate that method would block to get the next block.
|
|
|
|
*/
|
|
|
|
NonBlockingResult tryRead_(bool blocking)
|
|
|
|
{
|
|
|
|
Block res;
|
|
|
|
|
|
|
|
if (length == 0)
|
|
|
|
{
|
|
|
|
return { Block(), true };
|
|
|
|
}
|
|
|
|
/// If blocks were never assigned get blocks
|
|
|
|
if (!blocks)
|
|
|
|
{
|
2019-06-12 13:11:44 +00:00
|
|
|
Poco::FastMutex::ScopedLock lock(storage->mutex);
|
2019-05-26 22:03:30 +00:00
|
|
|
if (!active)
|
|
|
|
return { Block(), false };
|
|
|
|
blocks = (*blocks_ptr);
|
|
|
|
it = blocks->begin();
|
|
|
|
begin = blocks->begin();
|
|
|
|
end = blocks->end();
|
|
|
|
}
|
|
|
|
|
2019-06-12 13:11:44 +00:00
|
|
|
if (isCancelled() || storage->is_dropped)
|
2019-05-26 22:03:30 +00:00
|
|
|
{
|
|
|
|
return { Block(), true };
|
|
|
|
}
|
|
|
|
|
|
|
|
if (it == end)
|
|
|
|
{
|
|
|
|
{
|
2019-06-12 13:11:44 +00:00
|
|
|
Poco::FastMutex::ScopedLock lock(storage->mutex);
|
2019-05-26 22:03:30 +00:00
|
|
|
if (!active)
|
|
|
|
return { Block(), false };
|
|
|
|
/// If we are done iterating over our blocks
|
|
|
|
/// and there are new blocks availble then get them
|
|
|
|
if (blocks.get() != (*blocks_ptr).get())
|
|
|
|
{
|
|
|
|
blocks = (*blocks_ptr);
|
|
|
|
it = blocks->begin();
|
|
|
|
begin = blocks->begin();
|
|
|
|
end = blocks->end();
|
|
|
|
}
|
|
|
|
/// No new blocks available wait for new ones
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (!blocking)
|
|
|
|
{
|
|
|
|
return { Block(), false };
|
|
|
|
}
|
2019-06-05 11:30:29 +00:00
|
|
|
if (!end_of_blocks)
|
|
|
|
{
|
|
|
|
end_of_blocks = true;
|
|
|
|
return { getHeader(), true };
|
|
|
|
}
|
2019-05-26 22:03:30 +00:00
|
|
|
while (true)
|
|
|
|
{
|
2019-06-11 12:27:47 +00:00
|
|
|
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
|
2019-06-12 13:11:44 +00:00
|
|
|
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval - (timestamp_usec - last_event_timestamp)) / 1000);
|
2019-05-26 22:03:30 +00:00
|
|
|
|
2019-06-12 13:11:44 +00:00
|
|
|
if (isCancelled() || storage->is_dropped)
|
2019-05-26 22:03:30 +00:00
|
|
|
{
|
|
|
|
return { Block(), true };
|
|
|
|
}
|
|
|
|
if (signaled)
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2019-06-05 11:30:29 +00:00
|
|
|
// heartbeat
|
2019-06-11 12:27:47 +00:00
|
|
|
last_event_timestamp = static_cast<UInt64>(timestamp.epochMicroseconds());
|
2019-06-05 11:30:29 +00:00
|
|
|
return { getHeader(), true };
|
2019-05-26 22:03:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return tryRead_(blocking);
|
|
|
|
}
|
|
|
|
|
|
|
|
res = *it;
|
|
|
|
|
|
|
|
++it;
|
|
|
|
|
|
|
|
if (it == end)
|
|
|
|
{
|
2019-06-05 11:30:29 +00:00
|
|
|
end_of_blocks = false;
|
2019-05-26 22:03:30 +00:00
|
|
|
if (length > 0)
|
|
|
|
--length;
|
|
|
|
}
|
|
|
|
|
2019-06-11 12:27:47 +00:00
|
|
|
last_event_timestamp = static_cast<UInt64>(timestamp.epochMicroseconds());
|
2019-05-26 22:03:30 +00:00
|
|
|
return { res, true };
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2019-06-12 13:11:44 +00:00
|
|
|
std::shared_ptr<StorageLiveView> storage;
|
2019-05-26 22:03:30 +00:00
|
|
|
std::shared_ptr<BlocksPtr> blocks_ptr;
|
2019-05-28 21:17:48 +00:00
|
|
|
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
2019-05-26 22:03:30 +00:00
|
|
|
std::weak_ptr<bool> active_ptr;
|
|
|
|
std::shared_ptr<bool> active;
|
|
|
|
BlocksPtr blocks;
|
2019-05-28 21:17:48 +00:00
|
|
|
BlocksMetadataPtr blocks_metadata;
|
2019-05-26 22:03:30 +00:00
|
|
|
Blocks::iterator it;
|
|
|
|
Blocks::iterator end;
|
|
|
|
Blocks::iterator begin;
|
|
|
|
/// Length specifies number of updates to send, default -1 (no limit)
|
|
|
|
int64_t length;
|
2019-06-05 11:30:29 +00:00
|
|
|
bool end_of_blocks{0};
|
2019-06-07 01:10:56 +00:00
|
|
|
UInt64 heartbeat_interval;
|
|
|
|
UInt64 temporary_live_view_timeout;
|
2019-05-26 22:03:30 +00:00
|
|
|
String blocks_hash;
|
|
|
|
UInt64 last_event_timestamp{0};
|
|
|
|
Poco::Timestamp timestamp;
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|