This commit is contained in:
Pavel Kartavyy 2014-10-08 23:00:44 +04:00
commit cc5629a06a
18 changed files with 190 additions and 104 deletions

View File

@ -18,6 +18,10 @@ public:
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
bool supportsSampling() const override { return data->supportsSampling(); }
bool supportsFinal() const override { return data->supportsFinal(); }
bool supportsPrewhere() const override { return data->supportsPrewhere(); }
BlockOutputStreamPtr write(ASTPtr query) override;
void drop() override;
bool optimize() override;

View File

@ -117,7 +117,8 @@ public:
UInt8 active_replicas;
};
void getStatus(Status & res);
/// Получить статус таблицы. Если with_zk_fields = false - не заполнять поля, требующие запросов в ZK.
void getStatus(Status & res, bool with_zk_fields = true);
private:
friend class ReplicatedMergeTreeBlockOutputStream;

View File

@ -13,12 +13,16 @@ public:
static StoragePtr create(const String & table_name_, const String & database_name_,
Context & context_, ASTPtr & query_, NamesAndTypesListPtr columns_);
virtual std::string getName() const override { return "View"; }
virtual std::string getTableName() const override { return table_name; }
std::string getName() const override { return "View"; }
std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsList() const override { return *columns; }
DB::ASTPtr getInnerQuery() const { return inner_query.clone(); };
virtual BlockInputStreams read(
/// Пробрасывается внутрь запроса и решается на его уровне.
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
@ -26,7 +30,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
virtual void drop() override;
void drop() override;
protected:
String select_database_name;

View File

@ -136,6 +136,7 @@ static ASTPtr buildWhereExpression(const ASTs & functions)
bool filterBlockWithQuery(ASTPtr query, Block & block, const Context & context)
{
query = query->clone();
const ASTSelectQuery & select = typeid_cast<ASTSelectQuery & >(*query);
if (!select.where_expression && !select.prewhere_expression)
return false;

View File

@ -203,10 +203,6 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);
/** Если данных нет. */
if (streams.empty())
return new NullBlockInputStream;
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
if (to_stage > QueryProcessingStage::FetchColumns)
@ -284,6 +280,14 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
chain.clear();
}
/** Если данных нет.
* Эта проверка специально вынесена чуть ниже, чем она могла бы быть (сразу после executeFetchColumns),
* чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов).
* Иначе мог бы вернуться пустой результат на некорректный запрос.
*/
if (streams.empty())
return new NullBlockInputStream;
/// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации).
if (has_having)
before_having->prependProjectInput();
@ -380,6 +384,10 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
}
}
/** Если данных нет. */
if (streams.empty())
return new NullBlockInputStream;
executeUnion(streams);
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();

View File

@ -214,15 +214,37 @@ bool Set::insertFromBlock(Block & block, bool create_ordered_set)
/** Чтобы корректно работали выражения вида 1.0 IN (1).
* Проверяет совместимость типов, проверяет попадание значений в диапазон допустимых значений типа, делает преобразование типа.
* Код слегка дурацкий.
*/
static Field convertToType(const Field & src, const IDataType & type)
{
if (type.behavesAsNumber())
{
if ( typeid_cast<const DataTypeUInt8 *>(&type)
|| typeid_cast<const DataTypeUInt16 *>(&type)
|| typeid_cast<const DataTypeUInt32 *>(&type)
|| typeid_cast<const DataTypeUInt64 *>(&type))
bool is_uint8 = false;
bool is_uint16 = false;
bool is_uint32 = false;
bool is_uint64 = false;
bool is_int8 = false;
bool is_int16 = false;
bool is_int32 = false;
bool is_int64 = false;
bool is_float32 = false;
bool is_float64 = false;
false
|| (is_uint8 = typeid_cast<const DataTypeUInt8 * >(&type))
|| (is_uint16 = typeid_cast<const DataTypeUInt16 * >(&type))
|| (is_uint32 = typeid_cast<const DataTypeUInt32 * >(&type))
|| (is_uint64 = typeid_cast<const DataTypeUInt64 * >(&type))
|| (is_int8 = typeid_cast<const DataTypeInt8 * >(&type))
|| (is_int16 = typeid_cast<const DataTypeInt16 * >(&type))
|| (is_int32 = typeid_cast<const DataTypeInt32 * >(&type))
|| (is_int64 = typeid_cast<const DataTypeInt64 * >(&type))
|| (is_float32 = typeid_cast<const DataTypeFloat32 * >(&type))
|| (is_float64 = typeid_cast<const DataTypeFloat64 * >(&type));
if (is_uint8 || is_uint16 || is_uint32 || is_uint64)
{
if (src.getType() == Field::Types::Int64)
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, signed literal at right");
@ -231,28 +253,52 @@ static Field convertToType(const Field & src, const IDataType & type)
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, floating point literal at right");
if (src.getType() == Field::Types::UInt64)
return src;
{
UInt64 value = src.get<const UInt64 &>();
if ((is_uint8 && value > std::numeric_limits<uint8_t>::max())
|| (is_uint16 && value > std::numeric_limits<uint16_t>::max())
|| (is_uint32 && value > std::numeric_limits<uint32_t>::max()))
throw Exception("Value (" + toString(value) + ") in IN section is out of range of type " + type.getName() + " at left");
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, " + Field::Types::toString(src.getType()) + " literal at right");
return src;
}
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, "
+ Field::Types::toString(src.getType()) + " literal at right");
}
else if (typeid_cast<const DataTypeInt8 *>(&type)
|| typeid_cast<const DataTypeInt16 *>(&type)
|| typeid_cast<const DataTypeInt32 *>(&type)
|| typeid_cast<const DataTypeInt64 *>(&type))
else if (is_int8 || is_int16 || is_int32 || is_int64)
{
if (src.getType() == Field::Types::Float64)
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, floating point literal at right");
if (src.getType() == Field::Types::UInt64)
return Field(src.get<Int64>());
{
UInt64 value = src.get<const UInt64 &>();
if ((is_int8 && value > uint8_t(std::numeric_limits<int8_t>::max()))
|| (is_int16 && value > uint16_t(std::numeric_limits<int16_t>::max()))
|| (is_int32 && value > uint32_t(std::numeric_limits<int32_t>::max()))
|| (is_int64 && value > uint64_t(std::numeric_limits<int64_t>::max())))
throw Exception("Value (" + toString(value) + ") in IN section is out of range of type " + type.getName() + " at left");
return Field(Int64(value));
}
if (src.getType() == Field::Types::Int64)
return src;
{
Int64 value = src.get<const Int64 &>();
if ((is_int8 && (value < std::numeric_limits<int8_t>::min() || value > std::numeric_limits<int8_t>::max()))
|| (is_int16 && (value < std::numeric_limits<int16_t>::min() || value > std::numeric_limits<int16_t>::max()))
|| (is_int32 && (value < std::numeric_limits<int32_t>::min() || value > std::numeric_limits<int32_t>::max())))
throw Exception("Value (" + toString(value) + ") in IN section is out of range of type " + type.getName() + " at left");
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, " + Field::Types::toString(src.getType()) + " literal at right");
return src;
}
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, "
+ Field::Types::toString(src.getType()) + " literal at right");
}
else if (typeid_cast<const DataTypeFloat32 *>(&type)
|| typeid_cast<const DataTypeFloat64 *>(&type))
else if (is_float32 || is_float64)
{
if (src.getType() == Field::Types::UInt64)
return Field(Float64(src.get<UInt64>()));
@ -263,7 +309,8 @@ static Field convertToType(const Field & src, const IDataType & type)
if (src.getType() == Field::Types::Float64)
return src;
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, " + Field::Types::toString(src.getType()) + " literal at right");
throw Exception("Type mismatch in IN section: " + type.getName() + " at left, "
+ Field::Types::toString(src.getType()) + " literal at right");
}
}

View File

@ -74,7 +74,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query->clone(), virtual_columns_block, data.context);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, data.context);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");

View File

@ -134,7 +134,7 @@ BlockInputStreams StorageChunkMerger::read(
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (has_virtual_column)
VirtualColumnUtils::filterBlockWithQuery(query->clone(), virtual_columns_block, context);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, _table_column_name);

View File

@ -55,7 +55,7 @@ BlockInputStreams StorageChunks::read(
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
Block virtual_columns_block = getBlockWithVirtualColumns();
if (!VirtualColumnUtils::filterBlockWithQuery(query->clone(), virtual_columns_block, context))
if (!VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context))
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, _table_column_name);

View File

@ -85,7 +85,7 @@ BlockInputStreams StorageMerge::read(
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query->clone(), virtual_columns_block, context);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");

View File

@ -2208,7 +2208,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query->clone(), virtual_columns_block, context);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<UInt8> values = VirtualColumnUtils::extractSingleValueFromBlock<UInt8>(virtual_columns_block, "_replicated");
@ -2766,7 +2766,7 @@ void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
}
void StorageReplicatedMergeTree::getStatus(Status & res)
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
res.is_leader = is_leader_node;
res.is_readonly = is_read_only;
@ -2799,7 +2799,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res)
res.replica_path = replica_path;
res.columns_version = columns_version;
if (res.is_session_expired)
if (res.is_session_expired || !with_zk_fields)
{
res.log_max_index = 0;
res.log_pointer = 0;
@ -2809,10 +2809,19 @@ void StorageReplicatedMergeTree::getStatus(Status & res)
else
{
auto log_entries = zookeeper->getChildren(zookeeper_path + "/log");
const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end());
res.log_max_index = parse<UInt64>(last_log_entry.substr(strlen("log-")));
res.log_pointer = parse<UInt64>(zookeeper->get(replica_path + "/log_pointer"));
if (log_entries.empty())
{
res.log_max_index = 0;
}
else
{
const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end());
res.log_max_index = parse<UInt64>(last_log_entry.substr(strlen("log-")));
}
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
res.log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
auto all_replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
res.total_replicas = all_replicas.size();

View File

@ -62,7 +62,7 @@ BlockInputStreams StorageSystemParts::read(
block.insert(ColumnWithNameAndType(database_column, new DataTypeString, "database"));
/// Отфильтруем блок со столбцом database.
VirtualColumnUtils::filterBlockWithQuery(query->clone(), block, context);
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
if (!block.rows())
return BlockInputStreams();
@ -129,7 +129,7 @@ BlockInputStreams StorageSystemParts::read(
}
/// Отфильтруем блок со столбцами database, table, engine, replicated и active.
VirtualColumnUtils::filterBlockWithQuery(query->clone(), block, context);
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
if (!block.rows())
return BlockInputStreams();

View File

@ -4,6 +4,7 @@
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/StorageSystemReplicas.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/VirtualColumnUtils.h>
namespace DB
@ -14,7 +15,7 @@ StorageSystemReplicas::StorageSystemReplicas(const std::string & name_, const Co
: name(name_), context(context_)
, columns{
{ "database", new DataTypeString },
{ "name", new DataTypeString },
{ "table", new DataTypeString },
{ "engine", new DataTypeString },
{ "is_leader", new DataTypeUInt8 },
{ "is_readonly", new DataTypeUInt8 },
@ -60,9 +61,48 @@ BlockInputStreams StorageSystemReplicas::read(
replicated_tables[db.first][table.first] = table.second;
}
/// Нужны ли столбцы, требующие для вычисления хождение в ZooKeeper.
bool with_zk_fields = false;
for (const auto & name : column_names)
{
if ( name == "log_max_index"
|| name == "log_pointer"
|| name == "total_replicas"
|| name == "active_replicas")
{
with_zk_fields = true;
break;
}
}
ColumnWithNameAndType col_database { new ColumnString, new DataTypeString, "database"};
ColumnWithNameAndType col_name { new ColumnString, new DataTypeString, "name"};
ColumnWithNameAndType col_table { new ColumnString, new DataTypeString, "table"};
ColumnWithNameAndType col_engine { new ColumnString, new DataTypeString, "engine"};
for (auto & db : replicated_tables)
{
for (auto & table : db.second)
{
col_database.column->insert(db.first);
col_table.column->insert(table.first);
col_engine.column->insert(table.second->getName());
}
}
/// Определяем, какие нужны таблицы, по условиям в запросе.
{
Block filtered_block { col_database, col_table, col_engine };
VirtualColumnUtils::filterBlockWithQuery(query, filtered_block, context);
if (!filtered_block.rows())
return BlockInputStreams();
col_database = filtered_block.getByName("database");
col_table = filtered_block.getByName("table");
col_engine = filtered_block.getByName("engine");
}
ColumnWithNameAndType col_is_leader { new ColumnUInt8, new DataTypeUInt8, "is_leader"};
ColumnWithNameAndType col_is_readonly { new ColumnUInt8, new DataTypeUInt8, "is_readonly"};
ColumnWithNameAndType col_is_session_expired{ new ColumnUInt8, new DataTypeUInt8, "is_session_expired"};
@ -80,39 +120,35 @@ BlockInputStreams StorageSystemReplicas::read(
ColumnWithNameAndType col_total_replicas { new ColumnUInt8, new DataTypeUInt8, "total_replicas"};
ColumnWithNameAndType col_active_replicas { new ColumnUInt8, new DataTypeUInt8, "active_replicas"};
for (auto & db : replicated_tables)
for (size_t i = 0, size = col_database.column->size(); i < size; ++i)
{
for (auto & table : db.second)
{
col_database.column->insert(db.first);
col_name.column->insert(table.first);
col_engine.column->insert(table.second->getName());
StorageReplicatedMergeTree::Status status;
typeid_cast<StorageReplicatedMergeTree &>(
*replicated_tables
[(*col_database.column)[i].safeGet<const String &>()]
[(*col_table.column)[i].safeGet<const String &>()]).getStatus(status, with_zk_fields);
StorageReplicatedMergeTree::Status status;
typeid_cast<StorageReplicatedMergeTree &>(*table.second).getStatus(status);
col_is_leader .column->insert(UInt64(status.is_leader));
col_is_readonly .column->insert(UInt64(status.is_readonly));
col_is_session_expired .column->insert(UInt64(status.is_session_expired));
col_future_parts .column->insert(UInt64(status.future_parts));
col_parts_to_check .column->insert(UInt64(status.parts_to_check));
col_zookeeper_path .column->insert(status.zookeeper_path);
col_replica_name .column->insert(status.replica_name);
col_replica_path .column->insert(status.replica_path);
col_columns_version .column->insert(Int64(status.columns_version));
col_queue_size .column->insert(UInt64(status.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.merges_in_queue));
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_total_replicas .column->insert(UInt64(status.total_replicas));
col_active_replicas .column->insert(UInt64(status.active_replicas));
}
col_is_leader .column->insert(UInt64(status.is_leader));
col_is_readonly .column->insert(UInt64(status.is_readonly));
col_is_session_expired .column->insert(UInt64(status.is_session_expired));
col_future_parts .column->insert(UInt64(status.future_parts));
col_parts_to_check .column->insert(UInt64(status.parts_to_check));
col_zookeeper_path .column->insert(status.zookeeper_path);
col_replica_name .column->insert(status.replica_name);
col_replica_path .column->insert(status.replica_path);
col_columns_version .column->insert(Int64(status.columns_version));
col_queue_size .column->insert(UInt64(status.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.merges_in_queue));
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_total_replicas .column->insert(UInt64(status.total_replicas));
col_active_replicas .column->insert(UInt64(status.active_replicas));
}
Block block{
col_database,
col_name,
col_table,
col_engine,
col_is_leader,
col_is_readonly,

View File

@ -61,8 +61,20 @@ BlockInputStreams StorageView::read(
size_t max_block_size,
unsigned threads)
{
ASTPtr inner_query_clone = getInnerQuery();
ASTSelectQuery & inner_select = static_cast<ASTSelectQuery &>(*inner_query_clone);
const ASTSelectQuery & outer_select = typeid_cast<const ASTSelectQuery &>(*query);
/// Пробрасываем внутрь SAMPLE и FINAL, если они есть во внешнем запросе и их нет во внутреннем.
if (outer_select.sample_size && !inner_select.sample_size)
inner_select.sample_size = outer_select.sample_size;
if (outer_select.final && !inner_select.final)
inner_select.final = outer_select.final;
return BlockInputStreams(1,
InterpreterSelectQuery(getInnerQuery(), context, column_names).execute());
InterpreterSelectQuery(inner_query_clone, context, column_names).execute());
}

View File

@ -1 +1 @@
1 1 1 0 1 1 1 0 1 1 1 0 1 1 1 0
1 1 1 0 1 1 0 1 1 1 0 1 1 0

View File

@ -5,7 +5,6 @@ SELECT
1.1 IN (1, -1),
1.0 IN (3, 1., -1),
1 IN (3, 2, 1),
-1 IN (255),
toInt16(-1) IN (255),
materialize(-1) IN (-1),
materialize(-1) IN (1, -1, 2),
@ -13,5 +12,4 @@ SELECT
materialize(1.1) IN (1, -1),
materialize(1.0) IN (3, 1., -1),
materialize(1) IN (3, 2, 1),
materialize(-1) IN (255),
materialize(toInt16(-1)) IN (255);

View File

@ -1,20 +0,0 @@
#pragma once
#include <Poco/PatternFormatter.h>
namespace Poco {
/** Отличается от PatternFormatter тем, что использует номер потока не среди
* потоков Poco::Thread, а среди всех потоков, для которых был получен номер (см. ThreadNumber.h)
*/
class Foundation_API PatternFormatterWithOwnThreadNumber : public PatternFormatter
{
public:
PatternFormatterWithOwnThreadNumber() {}
PatternFormatterWithOwnThreadNumber(const std::string & format) : PatternFormatter(format) {}
void format(const Message & msg, std::string & text);
};
}

View File

@ -1,14 +0,0 @@
#include <Poco/Ext/ThreadNumber.h>
#include <Poco/Ext/PatternFormatterWithOwnThreadNumber.h>
namespace Poco {
void PatternFormatterWithOwnThreadNumber::format(const Message & msg, std::string & text)
{
Poco::Message tmp_message(msg);
tmp_message.setTid(ThreadNumber::get());
PatternFormatter::format(tmp_message, text);
}
}