* Adding support for CREATE TEMPORARY LIVE VIEW

* Fixing issue with setting _version virtual column
This commit is contained in:
Vitaliy Zakaznikov 2019-05-30 17:29:30 -04:00
parent 673d1a46a0
commit f06f0e3947
4 changed files with 26 additions and 13 deletions

View File

@ -436,7 +436,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
return; return;
} }
if (create.temporary) if (create.temporary && !create.is_live_view)
{ {
auto engine_ast = std::make_shared<ASTFunction>(); auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Memory"; engine_ast->name = "Memory";

View File

@ -23,6 +23,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int UNKNOWN_STORAGE; extern const int UNKNOWN_STORAGE;
extern const int UNKNOWN_TABLE;
extern const int TOO_MANY_COLUMNS; extern const int TOO_MANY_COLUMNS;
} }
@ -49,6 +50,11 @@ BlockIO InterpreterWatchQuery::execute()
/// Get storage /// Get storage
storage = context.tryGetTable(database, table); storage = context.tryGetTable(database, table);
if (!storage)
throw Exception("Table " + backQuoteIfNeed(database) + "." +
backQuoteIfNeed(table) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
/// List of columns to read to execute the query. /// List of columns to read to execute the query.
Names required_columns = storage->getColumns().getNamesOfPhysical(); Names required_columns = storage->getColumns().getNamesOfPhysical();

View File

@ -192,12 +192,13 @@ bool StorageLiveView::getNewBlocks()
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in); BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
while (Block block = data->read()) while (Block block = data->read())
{ {
/// calculate hash before virtual column is added
block.updateHash(hash);
/// add result version meta column /// add result version meta column
block.insert({DataTypeUInt64().createColumnConst( block.insert({DataTypeUInt64().createColumnConst(
block.rows(), getBlocksVersion() + 1)->convertToFullColumnIfConst(), block.rows(), getBlocksVersion() + 1)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(), std::make_shared<DataTypeUInt64>(),
"_version"}); "_version"});
block.updateHash(hash);
new_blocks->push_back(block); new_blocks->push_back(block);
} }
@ -411,8 +412,8 @@ BlockInputStreams StorageLiveView::watch(
Poco::FastMutex::ScopedLock lock(mutex); Poco::FastMutex::ScopedLock lock(mutex);
if (!(*blocks_ptr)) if (!(*blocks_ptr))
{ {
if (getNewBlocks()) if (getNewBlocks())
condition.broadcast(); condition.broadcast();
} }
} }
@ -435,8 +436,8 @@ BlockInputStreams StorageLiveView::watch(
Poco::FastMutex::ScopedLock lock(mutex); Poco::FastMutex::ScopedLock lock(mutex);
if (!(*blocks_ptr)) if (!(*blocks_ptr))
{ {
if (getNewBlocks()) if (getNewBlocks())
condition.broadcast(); condition.broadcast();
} }
} }

View File

@ -88,14 +88,16 @@ public:
Poco::FastMutex noUsersThreadMutex; Poco::FastMutex noUsersThreadMutex;
bool noUsersThreadWakeUp{false}; bool noUsersThreadWakeUp{false};
Poco::Condition noUsersThreadCondition; Poco::Condition noUsersThreadCondition;
/// Get blocks hash
/// must be called with mutex locked
String getBlocksHashKey() String getBlocksHashKey()
{ {
if (*blocks_metadata_ptr) if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->hash; return (*blocks_metadata_ptr)->hash;
return ""; return "";
} }
/// Get blocks version
/// must be called with mutex locked
UInt64 getBlocksVersion() UInt64 getBlocksVersion()
{ {
if (*blocks_metadata_ptr) if (*blocks_metadata_ptr)
@ -295,7 +297,6 @@ public:
new_blocks = std::make_shared<Blocks>(); new_blocks = std::make_shared<Blocks>();
new_blocks_metadata = std::make_shared<BlocksMetadata>(); new_blocks_metadata = std::make_shared<BlocksMetadata>();
new_hash = std::make_shared<SipHash>(); new_hash = std::make_shared<SipHash>();
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
} }
void writeSuffix() override void writeSuffix() override
@ -305,6 +306,15 @@ public:
new_hash->get128(key.low, key.high); new_hash->get128(key.low, key.high);
new_blocks_metadata->hash = key.toHexString(); new_blocks_metadata->hash = key.toHexString();
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
for (auto & block : *new_blocks)
{
block.insert({DataTypeUInt64().createColumnConst(
block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"_version"});
}
(*storage.blocks_ptr) = new_blocks; (*storage.blocks_ptr) = new_blocks;
(*storage.blocks_metadata_ptr) = new_blocks_metadata; (*storage.blocks_metadata_ptr) = new_blocks_metadata;
@ -319,10 +329,6 @@ public:
void write(const Block & block) override void write(const Block & block) override
{ {
new_blocks->push_back(block); new_blocks->push_back(block);
new_blocks->back().insert({DataTypeUInt64().createColumnConst(
block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"_version"});
block.updateHash(*new_hash); block.updateHash(*new_hash);
} }