This commit is contained in:
Alexey Milovidov 2014-04-12 20:11:06 +04:00
parent 29186e8a17
commit b5c63dade9
3 changed files with 15 additions and 10 deletions

View File

@ -29,6 +29,9 @@ public:
std::string getTableName() const { return name; }
bool supportsSampling() const { return true; }
/// Проверка откладывается до метода read. Там проверяется поддержка PREWHERE у использующихся таблиц.
bool supportsPrewhere() const { return true; }
const NamesAndTypesList & getColumnsList() const { return *columns; }
NameAndTypePair getColumn(const String &column_name) const;
bool hasColumn(const String &column_name) const;

View File

@ -456,10 +456,10 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (query.final && (!storage || !storage->supportsFinal()))
throw Exception("Illegal FINAL", ErrorCodes::ILLEGAL_FINAL);
throw Exception(storage ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL", ErrorCodes::ILLEGAL_FINAL);
if (query.prewhere_expression && (!storage || !storage->supportsPrewhere()))
throw Exception("Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE);
throw Exception(storage ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE);
/** При распределённой обработке запроса, в потоках почти не делается вычислений,
* а делается ожидание и получение данных с удалённых серверов.

View File

@ -72,13 +72,17 @@ BlockInputStreams StorageMerge::read(
getSelectedTables(selected_tables);
}
/// Если в запросе используется PREWHERE, надо убедиться, что все таблицы это поддерживают.
if (dynamic_cast<const ASTSelectQuery &>(*query).prewhere_expression)
for (const auto & table : selected_tables)
if (!table->supportsPrewhere())
throw Exception("Storage " + table->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
TableLocks table_locks;
/// Нельзя, чтобы эти таблицы кто-нибудь удалил, пока мы их читаем.
for (auto table : selected_tables)
{
for (auto & table : selected_tables)
table_locks.push_back(table->lockStructure(false));
}
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
BlockInputStreamPtr virtual_columns;
@ -92,10 +96,10 @@ BlockInputStreams StorageMerge::read(
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlocks<String>(virtual_columns, _table_column_name);
bool all_inclusive = (values.size() == virtual_columns_block.rows());
for (size_t i = 0; i < selected_tables.size(); ++i)
for (size_t i = 0, size = selected_tables.size(); i < size; ++i)
{
StoragePtr table = selected_tables[i];
auto table_lock = table_locks[i];
auto & table_lock = table_locks[i];
if (!all_inclusive && values.find(table->getTableName()) == values.end())
continue;
@ -114,12 +118,10 @@ BlockInputStreams StorageMerge::read(
settings,
tmp_processed_stage,
max_block_size,
selected_tables.size() > threads ? 1 : (threads / selected_tables.size()));
size > threads ? 1 : (threads / size));
for (auto & stream : source_streams)
{
stream->addTableLock(table_lock);
}
for (auto & virtual_column : virt_column_names)
{