This commit is contained in:
Evgeniy Gatov 2015-11-10 22:12:52 +03:00
commit b8c9e1e967
49 changed files with 1678 additions and 1004 deletions

View File

@ -26,8 +26,6 @@ namespace DB
using Poco::SharedPtr;
class ParallelReplicas;
/// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/// Вектор пар, описывающих таблицы
@ -44,6 +42,7 @@ typedef std::vector<ExternalTableData> ExternalTablesData;
class Connection : private boost::noncopyable
{
friend class ParallelReplicas;
friend class MultiplexedConnections;
public:
Connection(const String & host_, UInt16 port_, const String & default_database_,

View File

@ -57,7 +57,7 @@ protected:
typedef SharedPtr<IConnectionPool> ConnectionPoolPtr;
typedef std::vector<ConnectionPoolPtr> ConnectionPools;
typedef SharedPtr<ConnectionPools> ConnectionPoolsPtr;
/** Обычный пул соединений, без отказоустойчивости.

View File

@ -6,30 +6,38 @@
#include <Poco/ScopedLock.h>
#include <Poco/Mutex.h>
namespace DB
{
/** Для получения данных сразу из нескольких реплик (соединений) в рамках одного потока.
* В качестве вырожденного случая, может также работать с одним соединением.
/** Для получения данных сразу из нескольких реплик (соединений) из одного или нексольких шардов
* в рамках одного потока. В качестве вырожденного случая, может также работать с одним соединением.
* Предполагается, что все функции кроме sendCancel всегда выполняются в одном потоке.
*
* Интерфейс почти совпадает с Connection.
*/
class ParallelReplicas final : private boost::noncopyable
class MultiplexedConnections final : private boost::noncopyable
{
public:
/// Принимает готовое соединение.
ParallelReplicas(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
/** Принимает пул, из которого нужно будет достать одно или несколько соединений.
* Если флаг append_extra_info установлен, к каждому полученному блоку прилагается
* дополнительная информация.
* Если флаг get_all_replicas установлен, достаются все соединения.
*/
ParallelReplicas(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info = false, bool get_all_replicas = false);
MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info = false, bool do_broadcast = false);
/** Принимает пулы, один для каждого шарда, из которих нужно будет достать одно или несколько
* соединений.
* Если флаг append_extra_info установлен, к каждому полученному блоку прилагается
* дополнительная информация.
* Если флаг do_broadcast установлен, достаются все соединения.
*/
MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info = false, bool do_broadcast = false);
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
@ -65,15 +73,44 @@ public:
/// Проверить, есть ли действительные реплики.
/// Без блокировки, потому что sendCancel() не меняет состояние реплик.
bool hasActiveReplicas() const { return active_replica_count > 0; }
bool hasActiveConnections() const { return active_connection_total_count > 0; }
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
/// Соединения 1-го шарда, затем соединения 2-го шарда, и т.д.
using Connections = std::vector<Connection *>;
/// Состояние соединений одного шарда.
struct ShardState
{
/// Количество выделенных соединений, т.е. реплик, для этого шарда.
size_t allocated_connection_count;
/// Текущее количество действительных соединений к репликам этого шарда.
size_t active_connection_count;
};
/// Описание одной реплики.
struct ReplicaState
{
/// Индекс соединения.
size_t connection_index;
/// Владелец этой реплики.
ShardState * shard_state;
};
/// Реплики хэшированные по id сокета.
using ReplicaMap = std::unordered_map<int, ReplicaState>;
/// Состояние каждого шарда.
using ShardStates = std::vector<ShardState>;
private:
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
void initFromShard(IConnectionPool * pool);
/// Зарегистрировать шарды.
void registerShards();
/// Зарегистрировать реплики одного шарда.
void registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state);
/// Внутренняя версия функции receivePacket без блокировки.
Connection::Packet receivePacketUnlocked();
@ -94,13 +131,15 @@ private:
private:
const Settings * settings;
Connections connections;
ReplicaMap replica_map;
ShardStates shard_states;
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.
ThrottlerPtr throttler;
std::vector<ConnectionPool::Entry> pool_entries;
ConnectionPool::Entry pool_entry;
/// Соединение, c которого был получен последний блок.
Connection * current_connection;
@ -108,7 +147,7 @@ private:
std::unique_ptr<BlockExtraInfo> block_extra_info;
/// Текущее количество действительных соединений к репликам.
size_t active_replica_count;
size_t active_connection_total_count = 0;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос
@ -116,6 +155,8 @@ private:
/// Отменили запрос
bool cancelled = false;
bool do_broadcast = false;
/// Мьютекс для того, чтобы функция sendCancel могла выполняться безопасно
/// в отдельном потоке.
mutable Poco::FastMutex cancel_mutex;

View File

@ -107,6 +107,18 @@ public:
{
const IAggregateFunction * function = holder->func;
ColumnPtr res = function->getReturnType()->createColumn();
/** Если агрегатная функция возвращает нефинализированное состояние,
* то надо просто скопировать указатели на него а также разделяемое владение аренами.
*/
if (typeid_cast<const ColumnAggregateFunction *>(res.get()))
{
ColumnAggregateFunction * res_ = new ColumnAggregateFunction(*this);
res = res_;
res_->getData().assign(getData().begin(), getData().end());
return res;
}
IColumn & column = *res;
res->reserve(getData().size());

View File

@ -3,67 +3,63 @@
#include <common/logger_useful.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Common/Throttler.h>
#include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ParallelReplicas.h>
#include <DB/Client/MultiplexedConnections.h>
namespace DB
{
/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат.
/** Позволяет выполнить запрос на удалённых репликах одного шарда и получить результат.
*/
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
private:
void init(const Settings * settings_)
{
if (settings_)
{
send_settings = true;
settings = *settings_;
}
else
send_settings = false;
}
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context_ = getDefaultContext());
/// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_),
external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context_ = getDefaultContext());
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context_ = getDefaultContext());
/// Принимает пулы - один для каждого шарда -, из которых нужно будет достать одно или несколько соединений.
RemoteBlockInputStream(ConnectionPoolsPtr & pools_, const String & query_, const Settings * settings_,
ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context_ = getDefaultContext());
~RemoteBlockInputStream() override;
/// Отправить запрос на все существующие реплики.
void doBroadcast();
/// Кроме блоков, получить информацию о блоках.
void appendExtraInfo();
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
void readPrefix() override;
/** Отменяем умолчальное уведомление о прогрессе,
* так как колбэк прогресса вызывается самостоятельно.
*/
void progress(const Progress & value) override {}
void cancel() override;
String getName() const override { return "Remote"; }
String getID() const override
{
std::stringstream res;
@ -71,249 +67,35 @@ public:
return res.str();
}
/** Отменяем умолчальное уведомление о прогрессе,
* так как колбэк прогресса вызывается самостоятельно.
*/
void progress(const Progress & value) override {}
void cancel() override
{
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
{
std::lock_guard<std::mutex> lock(external_tables_mutex);
/// Останавливаем отправку внешних данных.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(elem.first.get()))
stream->cancel();
}
if (!isQueryPending() || hasThrownException())
return;
tryCancel("Cancelling query");
}
~RemoteBlockInputStream() override
{
/** Если прервались в середине цикла общения с репликами, то прервываем
* все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы
* эти соединения не остались висеть в рассихронизированном состоянии.
*/
if (established || isQueryPending())
parallel_replicas->disconnect();
}
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
void readPrefix() override
{
if (!sent_query)
sendQuery();
}
/// Отправить запрос на все существующие реплики.
void reachAllReplicas()
{
reach_all_replicas = true;
}
/// Кроме блоков, получить информацию о блоках.
void appendExtraInfo()
{
append_extra_info = true;
}
BlockExtraInfo getBlockExtraInfo() const override
{
return parallel_replicas->getBlockExtraInfo();
return multiplexed_connections->getBlockExtraInfo();
}
protected:
/// Отправить на удаленные серверы все временные таблицы.
void sendExternalTables()
{
size_t count = parallel_replicas->size();
void sendExternalTables();
{
std::lock_guard<std::mutex> lock(external_tables_mutex);
Block readImpl() override;
external_tables_data.reserve(count);
for (size_t i = 0; i < count; ++i)
{
ExternalTablesData res;
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else
res.push_back(std::make_pair(input[0], table.first));
}
external_tables_data.push_back(std::move(res));
}
}
parallel_replicas->sendExternalTablesData(external_tables_data);
}
Block readImpl() override
{
if (!sent_query)
{
sendQuery();
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
return {};
}
while (true)
{
if (isCancelled())
return Block();
Connection::Packet packet = parallel_replicas->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
/// Если блок не пуст и не является заголовочным блоком
if (packet.block && packet.block.rows() > 0)
return packet.block;
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
if (!parallel_replicas->hasActiveReplicas())
{
finished = true;
return Block();
}
break;
case Protocol::Server::Progress:
/** Используем прогресс с удалённого сервера.
* В том числе, запишем его в ProcessList,
* и будем использовать его для проверки
* ограничений (например, минимальная скорость выполнения запроса)
* и квот (например, на количество строчек для чтения).
*/
progressImpl(packet.progress);
break;
case Protocol::Server::ProfileInfo:
info = packet.profile_info;
break;
case Protocol::Server::Totals:
totals = packet.block;
break;
case Protocol::Server::Extremes:
extremes = packet.block;
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
void readSuffixImpl() override
{
/** Если одно из:
* - ничего не начинали делать;
* - получили все пакеты до EndOfStream;
* - получили с одной реплики эксепшен;
* - получили с одной реплики неизвестный пакет;
* - то больше читать ничего не нужно.
*/
if (!isQueryPending() || hasThrownException())
return;
/** Если ещё прочитали не все данные, но они больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT).
*/
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
tryCancel("Cancelling query because enough data has been read");
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами.
Connection::Packet packet = parallel_replicas->drain();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
void readSuffixImpl() override;
/// Создать объект для общения с репликами одного шарда, на которых должен выполниться запрос.
void createParallelReplicas()
{
Settings * parallel_replicas_settings = send_settings ? &settings : nullptr;
if (connection != nullptr)
parallel_replicas = std::make_unique<ParallelReplicas>(connection, parallel_replicas_settings, throttler);
else
parallel_replicas = std::make_unique<ParallelReplicas>(pool, parallel_replicas_settings, throttler,
append_extra_info, reach_all_replicas);
}
void createMultiplexedConnections();
/// Возвращает true, если запрос отправлен.
bool isQueryPending() const
{
return sent_query && !finished;
}
bool isQueryPending() const;
/// Возвращает true, если исключение было выкинуто.
bool hasThrownException() const
{
return got_exception_from_replica || got_unknown_packet_from_replica;
}
bool hasThrownException() const;
private:
void sendQuery()
{
createParallelReplicas();
void init(const Settings * settings_);
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
return;
void sendQuery();
established = true;
parallel_replicas->sendQuery(query, "", stage, true);
established = false;
sent_query = true;
sendExternalTables();
}
/// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен.
void tryCancel(const char * reason);
/// ITable::read requires a Context, therefore we should create one if the user can't supply it
static Context & getDefaultContext()
@ -322,23 +104,18 @@ private:
return instance;
}
/// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен.
void tryCancel(const char * reason)
{
bool old_val = false;
if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
LOG_TRACE(log, "(" << parallel_replicas->dumpAddresses() << ") " << reason);
parallel_replicas->sendCancel();
}
private:
IConnectionPool * pool = nullptr;
/// Готовое соединение.
ConnectionPool::Entry pool_entry;
Connection * connection = nullptr;
std::unique_ptr<ParallelReplicas> parallel_replicas;
/// Пул соединений одного шарда.
IConnectionPool * pool = nullptr;
/// Пулы соединений одного или нескольких шардов.
ConnectionPoolsPtr pools;
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
const String query;
bool send_settings;
@ -384,7 +161,7 @@ private:
std::atomic<bool> got_unknown_packet_from_replica { false };
bool append_extra_info = false;
bool reach_all_replicas = false;
bool do_broadcast = false;
Logger * log = &Logger::get("RemoteBlockInputStream");
};

View File

@ -322,7 +322,7 @@ private:
{
const auto & null_value_ref = std::get<String>(attr.null_values) = null_value.get<String>();
std::get<std::unique_ptr<PODArray<StringRef>>>(attr.arrays) =
std::make_unique<PODArray<StringRef>>(initial_array_size, null_value_ref);
std::make_unique<PODArray<StringRef>>(initial_array_size, StringRef{null_value_ref});
attr.string_arena = std::make_unique<Arena>();
break;
}
@ -393,7 +393,7 @@ private:
{
auto & array = *std::get<std::unique_ptr<PODArray<StringRef>>>(attribute.arrays);
if (id >= array.size())
array.resize_fill(id + 1, std::get<String>(attribute.null_values));
array.resize_fill(id + 1, StringRef{std::get<String>(attribute.null_values)});
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
array[id] = StringRef{string_in_arena, string.size()};

View File

@ -117,7 +117,7 @@ public:
const auto val_it = std::find_if(std::begin(ranges_and_values), std::end(ranges_and_values),
[date] (const value_t<StringRef> & v) { return v.range.contains(date); });
const auto string_ref = val_it != std::end(ranges_and_values) ? val_it->value : null_value;
const auto string_ref = val_it != std::end(ranges_and_values) ? val_it->value : StringRef{null_value};
out->insertData(string_ref.data, string_ref.size);
}
else

View File

@ -1106,15 +1106,22 @@ private:
if (const auto default_col = typeid_cast<const ColumnString *>(default_col_untyped))
{
/// vector ids, vector defaults
const auto out = new ColumnString;
block.getByPosition(result).column = out;
dictionary->getString(attr_name, id_col->getData(), default_col, out);
}
else if (const auto default_col = typeid_cast<const ColumnConst<String> *>(default_col_untyped))
{
/// vector ids, const defaults
const auto out = new ColumnString;
block.getByPosition(result).column = out;
dictionary->getString(attr_name, id_col->getData(), out);
/// @todo avoid materialization
const auto default_col_materialized = default_col->convertToFullColumn();
dictionary->getString(attr_name, id_col->getData(),
static_cast<const ColumnString *>(default_col_materialized.get()), out);
}
else
throw Exception{
@ -1132,19 +1139,25 @@ private:
if (const auto default_col = typeid_cast<const ColumnString *>(default_col_untyped))
{
const PODArray<UInt64> ids(1, id_col->getData());
auto out = std::make_unique<ColumnString>();
dictionary->getString(attr_name, ids, default_col, out.get());
/// const ids, vector defaults
/// @todo avoid materialization
const PODArray<UInt64> ids(id_col->size(), id_col->getData());
const auto out = new ColumnString;
block.getByPosition(result).column = out;
block.getByPosition(result).column = new ColumnConst<String>{
id_col->size(), out->getDataAt(0).toString()
};
dictionary->getString(attr_name, ids, default_col, out);
}
else if (const auto default_col = typeid_cast<const ColumnConst<String> *>(default_col_untyped))
{
/// const ids, const defaults
const PODArray<UInt64> ids(1, id_col->getData());
auto out = std::make_unique<ColumnString>();
dictionary->getString(attr_name, ids, out.get());
/// create ColumnString with default
const auto defs = std::make_unique<ColumnString>();
defs->insert(Field{default_col->getData()});
dictionary->getString(attr_name, ids, defs.get(), out.get());
block.getByPosition(result).column = new ColumnConst<String>{
id_col->size(), out->getDataAt(0).toString()
@ -1179,6 +1192,13 @@ template <> struct DictGetTraits<DATA_TYPE>\
{\
dict->get##TYPE(name, ids, dates, out);\
}\
template <typename DictionaryType>\
static void getOrDefault(\
const DictionaryType * const dict, const std::string & name, const PODArray<UInt64> & ids,\
const PODArray<TYPE> & def, PODArray<TYPE> & out)\
{\
dict->get##TYPE(name, ids, def, out);\
}\
};
DECLARE_DICT_GET_TRAITS(UInt8, DataTypeUInt8)
DECLARE_DICT_GET_TRAITS(UInt16, DataTypeUInt16)
@ -1473,6 +1493,230 @@ using FunctionDictGetDate = FunctionDictGet<DataTypeDate>;
using FunctionDictGetDateTime = FunctionDictGet<DataTypeDateTime>;
template <typename DataType>
class FunctionDictGetOrDefault final : public IFunction
{
using Type = typename DataType::FieldType;
public:
static const std::string name;
static IFunction * create(const Context & context)
{
return new FunctionDictGetOrDefault{context.getExternalDictionaries()};
}
FunctionDictGetOrDefault(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 4)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 4.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", must be UInt64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataType *>(arguments[3].get()))
{
throw Exception{
"Illegal type " + arguments[3]->getName() + " of fourth argument of function " + getName()
+ ", must be " + DataType{}.getName() + ".",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataType;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionaryBase * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
if (arguments.size() != 4)
throw Exception{
"Function " + getName() + " for dictionary of type " + dict->getTypeName() +
" requires exactly 4 arguments.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto id_col_untyped = block.getByPosition(arguments[2]).column.get();
if (const auto id_col = typeid_cast<const ColumnVector<UInt64> *>(id_col_untyped))
executeDispatch(block, arguments, result, dict, attr_name, id_col);
else if (const auto id_col = typeid_cast<const ColumnConst<UInt64> *>(id_col_untyped))
executeDispatch(block, arguments, result, dict, attr_name, id_col);
else
throw Exception{
"Third argument of function " + getName() + " must be UInt64",
ErrorCodes::ILLEGAL_COLUMN
};
return true;
}
template <typename DictionaryType>
void executeDispatch(
Block & block, const ColumnNumbers & arguments, const size_t result, const DictionaryType * const dictionary,
const std::string & attr_name, const ColumnVector<UInt64> * const id_col)
{
const auto default_col_untyped = block.getByPosition(arguments[3]).column.get();
if (const auto default_col = typeid_cast<const ColumnVector<Type> *>(default_col_untyped))
{
/// vector ids, vector defaults
const auto out = new ColumnVector<Type>(id_col->size());
block.getByPosition(result).column = out;
const auto & ids = id_col->getData();
auto & data = out->getData();
const auto & defs = default_col->getData();
DictGetTraits<DataType>::getOrDefault(dictionary, attr_name, ids, defs, data);
}
else if (const auto default_col = typeid_cast<const ColumnConst<Type> *>(default_col_untyped))
{
/// vector ids, const defaults
const auto out = new ColumnVector<Type>(id_col->size());
block.getByPosition(result).column = out;
const auto & ids = id_col->getData();
auto & data = out->getData();
/// @todo avoid materialization
const PODArray<Type> defs(id_col->size(), default_col->getData());
DictGetTraits<DataType>::getOrDefault(dictionary, attr_name, ids, defs, data);
}
else
throw Exception{
"Fourth argument of function " + getName() + " must be " + DataType{}.getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
template <typename DictionaryType>
void executeDispatch(
Block & block, const ColumnNumbers & arguments, const size_t result, const DictionaryType * const dictionary,
const std::string & attr_name, const ColumnConst<UInt64> * const id_col)
{
const auto default_col_untyped = block.getByPosition(arguments[3]).column.get();
if (const auto default_col = typeid_cast<const ColumnVector<Type> *>(default_col_untyped))
{
/// const ids, vector defaults
/// @todo avoid materialization
const PODArray<UInt64> ids(id_col->size(), id_col->getData());
const auto out = new ColumnVector<Type>(id_col->size());
block.getByPosition(result).column = out;
auto & data = out->getData();
const auto & defs = default_col->getData();
DictGetTraits<DataType>::getOrDefault(dictionary, attr_name, ids, defs, data);
}
else if (const auto default_col = typeid_cast<const ColumnConst<Type> *>(default_col_untyped))
{
/// const ids, const defaults
const PODArray<UInt64> ids(1, id_col->getData());
PODArray<Type> data(1);
const PODArray<Type> defs(1, default_col->getData());
DictGetTraits<DataType>::getOrDefault(dictionary, attr_name, ids, defs, data);
block.getByPosition(result).column = new ColumnConst<Type>{id_col->size(), data.front()};
}
else
throw Exception{
"Fourth argument of function " + getName() + " must be " + DataType{}.getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
const ExternalDictionaries & dictionaries;
};
template <typename DataType>
const std::string FunctionDictGetOrDefault<DataType>::name = "dictGet" + DataType{}.getName() + "OrDefault";
using FunctionDictGetUInt8OrDefault = FunctionDictGetOrDefault<DataTypeUInt8>;
using FunctionDictGetUInt16OrDefault = FunctionDictGetOrDefault<DataTypeUInt16>;
using FunctionDictGetUInt32OrDefault = FunctionDictGetOrDefault<DataTypeUInt32>;
using FunctionDictGetUInt64OrDefault = FunctionDictGetOrDefault<DataTypeUInt64>;
using FunctionDictGetInt8OrDefault = FunctionDictGetOrDefault<DataTypeInt8>;
using FunctionDictGetInt16OrDefault = FunctionDictGetOrDefault<DataTypeInt16>;
using FunctionDictGetInt32OrDefault = FunctionDictGetOrDefault<DataTypeInt32>;
using FunctionDictGetInt64OrDefault = FunctionDictGetOrDefault<DataTypeInt64>;
using FunctionDictGetFloat32OrDefault = FunctionDictGetOrDefault<DataTypeFloat32>;
using FunctionDictGetFloat64OrDefault = FunctionDictGetOrDefault<DataTypeFloat64>;
using FunctionDictGetDateOrDefault = FunctionDictGetOrDefault<DataTypeDate>;
using FunctionDictGetDateTimeOrDefault = FunctionDictGetOrDefault<DataTypeDateTime>;
class FunctionDictGetHierarchy final : public IFunction
{
public:

View File

@ -743,6 +743,18 @@ struct AggregatedDataVariants : private boost::noncopyable
typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
/** Как считаются "тотальные" значения при наличии WITH TOTALS?
* (Более подробно смотрите в TotalsHavingBlockInputStream.)
*
* В случае отсутствия group_by_overflow_mode = 'any', данные агрегируются как обычно, но состояния агрегатных функций не финализируются.
* Позже, состояния агрегатных функций для всех строк (прошедших через HAVING) мерджатся в одну - это и будет TOTALS.
*
* В случае наличия group_by_overflow_mode = 'any', данные агрегируются как обычно, кроме ключей, не поместившихся в max_rows_to_group_by.
* Для этих ключей, данные агрегируются в одну дополнительную строку - далее см. под названиями overflow_row, overflows...
* Позже, состояния агрегатных функций для всех строк (прошедших через HAVING) мерджатся в одну,
* а также к ним прибавляется или не прибавляется (в зависимости от настройки totals_mode) также overflow_row - это и будет TOTALS.
*/
/** Агрегирует источник блоков.
*/
@ -1032,7 +1044,7 @@ protected:
size_t rows,
Filler && filler) const;
BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
BlocksList prepareBlocksAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, boost::threadpool::pool * thread_pool) const;

View File

@ -4,7 +4,6 @@
#include <DB/Storages/AlterCommands.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Parsers/ASTIdentifier.h>
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Parsers/ASTIdentifier.h>
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/DataStreams/OneBlockInputStream.h>

View File

@ -2,7 +2,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/DataStreams/OneBlockInputStream.h>

View File

@ -2,7 +2,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>

View File

@ -168,6 +168,8 @@ struct Settings
M(SettingUInt64, select_sequential_consistency, 0) \
/** Максимальное количество различных шардов и максимальное количество реплик одного шарда в функции remote. */ \
M(SettingUInt64, table_function_remote_max_addresses, 1000) \
/** Маскимальное количество потоков при распределённой обработке одного запроса **/ \
M(SettingUInt64, max_distributed_processing_threads, 8) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -8,7 +8,7 @@
namespace DB
{
/** Идентификатор (столбца или алиас, или именованый элемент кортежа)
/** Идентификатор (столбца или алиас)
*/
class ASTIdentifier : public ASTWithAlias
{
@ -21,7 +21,7 @@ public:
Format,
};
/// имя
/// имя. У составного идентификатора здесь будет конкатенированное имя (вида a.b.c), а отдельные составляюшие будут доступны внутри children.
String name;
/// чего идентифицирует этот идентификатор
@ -44,16 +44,7 @@ public:
}
protected:
void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
settings.ostr << (settings.hilite ? hilite_identifier : "");
WriteBufferFromOStream wb(settings.ostr, 32);
writeProbablyBackQuotedString(name, wb);
wb.next();
settings.ostr << (settings.hilite ? hilite_none : "");
}
void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -42,6 +42,7 @@ public:
ASTPtr table; /// "Правая" таблица для соединения - подзапрос или имя таблицы.
ASTPtr using_expr_list; /// По каким столбцам выполнять соединение.
ASTPtr on_expr; /// Условие соединения. Поддерживается либо USING либо ON, но не одновременно.
ASTJoin() = default;
ASTJoin(const StringRange range_) : IAST(range_) {}
@ -108,11 +109,16 @@ protected:
table->formatImpl(settings, state, frame);
if (kind != ASTJoin::Cross)
if (using_expr_list)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " USING " << (settings.hilite ? hilite_none : "");
using_expr_list->formatImpl(settings, state, frame);
}
else if (on_expr)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ON " << (settings.hilite ? hilite_none : "");
on_expr->formatImpl(settings, state, frame);
}
}
};

View File

@ -89,6 +89,15 @@ protected:
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
/** Беззнаковое целое число, используется в качестве правой части оператора взятия элемента кортежа (x.1).
*/
class ParserUnsignedInteger : public IParserBase
{
protected:
const char * getName() const { return "unsigned integer"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
/** Строка в одинарных кавычках.
*/
@ -128,7 +137,13 @@ protected:
*/
class ParserAlias : public IParserBase
{
public:
ParserAlias(bool allow_alias_without_as_keyword_)
: allow_alias_without_as_keyword(allow_alias_without_as_keyword_) {}
protected:
bool allow_alias_without_as_keyword;
static const char * restricted_keywords[];
const char * getName() const { return "alias"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
@ -149,9 +164,11 @@ protected:
class ParserWithOptionalAlias : public IParserBase
{
public:
ParserWithOptionalAlias(ParserPtr && elem_parser_) : elem_parser(std::move(elem_parser_)) {}
ParserWithOptionalAlias(ParserPtr && elem_parser_, bool allow_alias_without_as_keyword_)
: elem_parser(std::move(elem_parser_)), allow_alias_without_as_keyword(allow_alias_without_as_keyword_) {}
protected:
ParserPtr elem_parser;
bool allow_alias_without_as_keyword;
const char * getName() const { return "element of expression with optional alias"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);

View File

@ -111,13 +111,25 @@ protected:
};
class ParserAccessExpression : public IParserBase
class ParserTupleElementExpression : public IParserBase
{
private:
static const char * operators[];
protected:
const char * getName() const { return "access expression"; }
const char * getName() const { return "tuple element expression"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
class ParserArrayElementExpression : public IParserBase
{
private:
static const char * operators[];
protected:
const char * getName() const { return "array element expression"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
@ -127,7 +139,7 @@ class ParserUnaryMinusExpression : public IParserBase
{
private:
static const char * operators[];
ParserPrefixUnaryOperatorExpression operator_parser {operators, ParserPtr(new ParserAccessExpression)};
ParserPrefixUnaryOperatorExpression operator_parser {operators, ParserPtr(new ParserTupleElementExpression)};
protected:
const char * getName() const { return "unary minus expression"; }
@ -270,7 +282,7 @@ protected:
class ParserExpressionWithOptionalAlias : public IParserBase
{
public:
ParserExpressionWithOptionalAlias();
ParserExpressionWithOptionalAlias(bool allow_alias_without_as_keyword);
protected:
ParserPtr impl;
@ -286,7 +298,13 @@ protected:
/** Список выражений, разделённых запятыми, возможно пустой. */
class ParserExpressionList : public IParserBase
{
public:
ParserExpressionList(bool allow_alias_without_as_keyword_)
: allow_alias_without_as_keyword(allow_alias_without_as_keyword_) {}
protected:
bool allow_alias_without_as_keyword;
const char * getName() const { return "list of expressions"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
@ -294,6 +312,9 @@ protected:
class ParserNotEmptyExpressionList : public IParserBase
{
public:
ParserNotEmptyExpressionList(bool allow_alias_without_as_keyword)
: nested_parser(allow_alias_without_as_keyword) {}
private:
ParserExpressionList nested_parser;
protected:

View File

@ -4,7 +4,6 @@
#include <DB/Parsers/ParserQueryWithOutput.h>
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>

View File

@ -52,11 +52,12 @@ public:
static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll().
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancel().
* Считает количество таких вызовов для поддержки нескольких наложенных друг на друга отмен.
*/
bool cancelAll() { return cancelled.exchange(true, std::memory_order_relaxed); }
bool uncancelAll() { return cancelled.exchange(false, std::memory_order_relaxed); }
bool isCancelled() const { return cancelled.load(std::memory_order_relaxed); }
void cancel() { cancelled.fetch_add(1); }
void uncancel() { cancelled.fetch_sub(1); }
bool isCancelled() const { return cancelled.load() > 0; }
private:
MergeTreeData & data;
@ -66,24 +67,28 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0;
std::atomic<bool> cancelled{false};
std::atomic<int> cancelled {0};
};
/** Временно приостанавливает мерджи.
*/
class MergeTreeMergeBlocker
{
public:
MergeTreeMergeBlocker(MergeTreeDataMerger & merger)
: merger(merger), was_cancelled{!merger.cancelAll()} {}
: merger(merger)
{
merger.cancel();
}
~MergeTreeMergeBlocker()
{
if (was_cancelled)
merger.uncancelAll();
merger.uncancel();
}
private:
MergeTreeDataMerger & merger;
const bool was_cancelled;
};
}

View File

@ -159,14 +159,6 @@ public:
*/
if (quorum)
{
static std::once_flag once_flag;
std::call_once(once_flag, [&]
{
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/failed_parts", "");
});
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.part_name = part_name;
quorum_entry.required_number_of_replicas = quorum;

View File

@ -344,6 +344,10 @@ private:
*/
void createReplica();
/** Создать узлы в ZK, которые должны быть всегда, но которые могли не существовать при работе старых версий сервера.
*/
void createNewZooKeeperNodes();
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/

View File

@ -0,0 +1,430 @@
#include <DB/Client/MultiplexedConnections.h>
namespace DB
{
MultiplexedConnections::MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_), supports_parallel_execution(false)
{
if (connection_ == nullptr)
throw Exception("Invalid connection specified", ErrorCodes::LOGICAL_ERROR);
active_connection_total_count = 1;
ShardState shard_state;
shard_state.allocated_connection_count = active_connection_total_count;
shard_state.active_connection_count = active_connection_total_count;
shard_states.push_back(shard_state);
ReplicaState replica_state;
replica_state.connection_index = 0;
replica_state.shard_state = &shard_states[0];
connection_->setThrottler(throttler);
connections.push_back(connection_);
auto res = replica_map.emplace(connections[0]->socket.impl()->sockfd(), replica_state);
if (!res.second)
throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR);
}
MultiplexedConnections::MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, bool do_broadcast_)
: settings(settings_), throttler(throttler_), do_broadcast(do_broadcast_)
{
if (pool_ == nullptr)
throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR);
initFromShard(pool_);
registerShards();
supports_parallel_execution = active_connection_total_count > 1;
if (append_extra_info)
block_extra_info.reset(new BlockExtraInfo);
}
MultiplexedConnections::MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, bool do_broadcast_)
: settings(settings_), throttler(throttler_), do_broadcast(do_broadcast_)
{
if (pools_.empty())
throw Exception("Pools are not specified", ErrorCodes::LOGICAL_ERROR);
for (auto & pool : pools_)
{
if (pool.isNull())
throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR);
initFromShard(pool.get());
}
registerShards();
supports_parallel_execution = active_connection_total_count > 1;
if (append_extra_info)
block_extra_info.reset(new BlockExtraInfo);
}
void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_connection_total_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
{
ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void MultiplexedConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
{
if (settings == nullptr)
{
/// Каждый шард имеет один адрес.
auto it = connections.begin();
for (size_t i = 0; i < shard_states.size(); ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, nullptr, with_pending_data);
++it;
}
}
else
{
/// Каждый шард имеет одну или несколько реплик.
auto it = connections.begin();
for (const auto & shard_state : shard_states)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = shard_state.active_connection_count;
UInt64 offset = 0;
for (size_t i = 0; i < shard_state.allocated_connection_count; ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
++it;
}
}
}
}
else
{
Connection * connection = connections[0];
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, settings, with_pending_data);
}
sent_query = true;
}
Connection::Packet MultiplexedConnections::receivePacket()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
const auto & packet = receivePacketUnlocked();
if (block_extra_info)
{
if (packet.type == Protocol::Server::Data)
current_connection->fillBlockExtraInfo(*block_extra_info);
else
block_extra_info->is_valid = false;
}
return packet;
}
BlockExtraInfo MultiplexedConnections::getBlockExtraInfo() const
{
if (!block_extra_info)
throw Exception("MultiplexedConnections object not configured for block extra info support",
ErrorCodes::LOGICAL_ERROR);
return *block_extra_info;
}
void MultiplexedConnections::disconnect()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
for (auto it = replica_map.begin(); it != replica_map.end(); ++it)
{
ReplicaState & state = it->second;
Connection * connection = connections[state.connection_index];
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(it);
}
}
}
void MultiplexedConnections::sendCancel()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (const auto & e : replica_map)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet MultiplexedConnections::drain()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveConnections())
{
Connection::Packet packet = receivePacketUnlocked();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string MultiplexedConnections::dumpAddresses() const
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
return dumpAddressesUnlocked();
}
std::string MultiplexedConnections::dumpAddressesUnlocked() const
{
bool is_first = true;
std::ostringstream os;
for (const auto & e : replica_map)
{
const ReplicaState & state = e.second;
const Connection * connection = connections[state.connection_index];
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getDescription();
is_first = false;
}
}
return os.str();
}
void MultiplexedConnections::initFromShard(IConnectionPool * pool)
{
auto entries = pool->getMany(settings, do_broadcast);
/// Если getMany() не выделил соединений и не кинул исключения, это значит, что была
/// установлена настройка skip_unavailable_shards. Тогда просто возвращаемся.
if (entries.empty())
return;
ShardState shard_state;
shard_state.allocated_connection_count = entries.size();
shard_state.active_connection_count = entries.size();
active_connection_total_count += shard_state.active_connection_count;
shard_states.push_back(shard_state);
pool_entries.insert(pool_entries.end(), entries.begin(), entries.end());
}
void MultiplexedConnections::registerShards()
{
replica_map.reserve(pool_entries.size());
connections.reserve(pool_entries.size());
size_t offset = 0;
for (auto & shard_state : shard_states)
{
size_t index_begin = offset;
size_t index_end = offset + shard_state.allocated_connection_count;
registerReplicas(index_begin, index_end, shard_state);
offset = index_end;
}
}
void MultiplexedConnections::registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state)
{
for (size_t i = index_begin; i < index_end; ++i)
{
ReplicaState replica_state;
replica_state.connection_index = i;
replica_state.shard_state = &shard_state;
Connection * connection = &*(pool_entries[i]);
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->setThrottler(throttler);
connections.push_back(connection);
auto res = replica_map.emplace(connection->socket.impl()->sockfd(), replica_state);
if (!res.second)
throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR);
}
}
Connection::Packet MultiplexedConnections::receivePacketUnlocked()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveConnections())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
ReplicaState & state = it->second;
current_connection = connections[state.connection_index];
if (current_connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
Connection::Packet packet = current_connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
current_connection->disconnect();
invalidateReplica(it);
break;
}
return packet;
}
MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::getReplicaForReading()
{
ReplicaMap::iterator it;
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
const ReplicaState & state = it->second;
Connection * connection = connections[state.connection_index];
if (connection == nullptr)
it = replica_map.end();
}
return it;
}
MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_connection_total_count);
/// Сначала проверяем, есть ли данные, которые уже лежат в буфере
/// хоть одного соединения.
for (const auto & e : replica_map)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
/// Если не было найдено никаких данных, то проверяем, есть ли соединения
/// готовые для чтения.
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (const auto & e : replica_map)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout);
if (n == 0)
throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
}
void MultiplexedConnections::invalidateReplica(MultiplexedConnections::ReplicaMap::iterator it)
{
ReplicaState & state = it->second;
ShardState * shard_state = state.shard_state;
connections[state.connection_index] = nullptr;
--shard_state->active_connection_count;
--active_connection_total_count;
}
}

View File

@ -1,328 +0,0 @@
#include <DB/Client/ParallelReplicas.h>
namespace DB
{
ParallelReplicas::ParallelReplicas(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_),
active_replica_count(1),
supports_parallel_execution(false)
{
registerReplica(connection_);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, bool get_all_replicas)
: settings(settings_), throttler(throttler_)
{
if (pool_ == nullptr)
throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR);
bool has_many_replicas = get_all_replicas || ((settings != nullptr) && (settings->max_parallel_replicas > 1));
if (has_many_replicas)
{
pool_entries = pool_->getMany(settings, get_all_replicas);
active_replica_count = pool_entries.size();
supports_parallel_execution = (active_replica_count > 1);
if (active_replica_count == 0)
throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR);
replica_map.reserve(active_replica_count);
for (auto & entry : pool_entries)
registerReplica(&*entry);
}
else
{
active_replica_count = 1;
supports_parallel_execution = false;
pool_entry = pool_->get(settings);
if (!pool_entry.isNull())
registerReplica(&*pool_entry);
}
if (append_extra_info)
block_extra_info.reset(new BlockExtraInfo);
}
void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_replica_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = active_replica_count;
UInt64 offset = 0;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
{
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
if (offset > 0)
sent_query = true;
}
else
{
auto it = replica_map.begin();
Connection * connection = it->second;
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, settings, with_pending_data);
sent_query = true;
}
}
}
Connection::Packet ParallelReplicas::receivePacket()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
const auto & packet = receivePacketUnlocked();
if (block_extra_info)
{
if (packet.type == Protocol::Server::Data)
current_connection->fillBlockExtraInfo(*block_extra_info);
else
block_extra_info->is_valid = false;
}
return packet;
}
BlockExtraInfo ParallelReplicas::getBlockExtraInfo() const
{
if (!block_extra_info)
throw Exception("ParallelReplicas object not configured for block extra info support",
ErrorCodes::LOGICAL_ERROR);
return *block_extra_info;
}
void ParallelReplicas::disconnect()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
for (auto it = replica_map.begin(); it != replica_map.end(); ++it)
{
Connection * connection = it->second;
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(it);
}
}
}
void ParallelReplicas::sendCancel()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacketUnlocked();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
return dumpAddressesUnlocked();
}
std::string ParallelReplicas::dumpAddressesUnlocked() const
{
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getDescription();
is_first = false;
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
connection->setThrottler(throttler);
}
Connection::Packet ParallelReplicas::receivePacketUnlocked()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveReplicas())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
current_connection = it->second;
Connection::Packet packet = current_connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
current_connection->disconnect();
invalidateReplica(it);
break;
}
return packet;
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
if (it->second == nullptr)
it = replica_map.end();
}
return it;
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_replica_count);
/// Сначала проверяем, есть ли данные, которые уже лежат в буфере
/// хоть одного соединения.
for (auto & e : replica_map)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
/// Если не было найдено никаких данных, то проверяем, есть ли соединения
/// готовые для чтения.
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout);
if (n == 0)
throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
}
void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it)
{
it->second = nullptr;
--active_replica_count;
}
}

View File

@ -0,0 +1,290 @@
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
namespace DB
{
RemoteBlockInputStream::RemoteBlockInputStream(Connection & connection_, const String & query_,
const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, const Context & context_)
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
{
init(settings_);
}
RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_,
const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, const Context & context_)
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_),
external_tables(external_tables_), stage(stage_), context(context_)
{
init(settings_);
}
RemoteBlockInputStream::RemoteBlockInputStream(IConnectionPool * pool_, const String & query_,
const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, const Context & context_)
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
{
init(settings_);
}
RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPoolsPtr & pools_, const String & query_,
const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, const Context & context_)
: pools(pools_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
{
init(settings_);
}
RemoteBlockInputStream::~RemoteBlockInputStream()
{
/** Если прервались в середине цикла общения с репликами, то прервываем
* все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы
* эти соединения не остались висеть в рассихронизированном состоянии.
*/
if (established || isQueryPending())
multiplexed_connections->disconnect();
}
void RemoteBlockInputStream::doBroadcast()
{
do_broadcast = true;
}
void RemoteBlockInputStream::appendExtraInfo()
{
append_extra_info = true;
}
void RemoteBlockInputStream::readPrefix()
{
if (!sent_query)
sendQuery();
}
void RemoteBlockInputStream::cancel()
{
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
{
std::lock_guard<std::mutex> lock(external_tables_mutex);
/// Останавливаем отправку внешних данных.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(elem.first.get()))
stream->cancel();
}
if (!isQueryPending() || hasThrownException())
return;
tryCancel("Cancelling query");
}
void RemoteBlockInputStream::sendExternalTables()
{
size_t count = multiplexed_connections->size();
{
std::lock_guard<std::mutex> lock(external_tables_mutex);
external_tables_data.reserve(count);
for (size_t i = 0; i < count; ++i)
{
ExternalTablesData res;
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else
res.push_back(std::make_pair(input[0], table.first));
}
external_tables_data.push_back(std::move(res));
}
}
multiplexed_connections->sendExternalTablesData(external_tables_data);
}
Block RemoteBlockInputStream::readImpl()
{
if (!sent_query)
{
sendQuery();
if (settings.skip_unavailable_shards && (0 == multiplexed_connections->size()))
return {};
}
while (true)
{
if (isCancelled())
return Block();
Connection::Packet packet = multiplexed_connections->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
/// Если блок не пуст и не является заголовочным блоком
if (packet.block && (packet.block.rows() > 0))
return packet.block;
break; /// Если блок пуст - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
if (!multiplexed_connections->hasActiveConnections())
{
finished = true;
return Block();
}
break;
case Protocol::Server::Progress:
/** Используем прогресс с удалённого сервера.
* В том числе, запишем его в ProcessList,
* и будем использовать его для проверки
* ограничений (например, минимальная скорость выполнения запроса)
* и квот (например, на количество строчек для чтения).
*/
progressImpl(packet.progress);
break;
case Protocol::Server::ProfileInfo:
info = packet.profile_info;
break;
case Protocol::Server::Totals:
totals = packet.block;
break;
case Protocol::Server::Extremes:
extremes = packet.block;
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
void RemoteBlockInputStream::readSuffixImpl()
{
/** Если одно из:
* - ничего не начинали делать;
* - получили все пакеты до EndOfStream;
* - получили с одной реплики эксепшен;
* - получили с одной реплики неизвестный пакет;
* то больше читать ничего не нужно.
*/
if (!isQueryPending() || hasThrownException())
return;
/** Если ещё прочитали не все данные, но они больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT).
*/
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
tryCancel("Cancelling query because enough data has been read");
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами.
Connection::Packet packet = multiplexed_connections->drain();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
void RemoteBlockInputStream::createMultiplexedConnections()
{
Settings * multiplexed_connections_settings = send_settings ? &settings : nullptr;
if (connection != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(connection, multiplexed_connections_settings, throttler);
else if (pool != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(pool, multiplexed_connections_settings, throttler,
append_extra_info, do_broadcast);
else if (!pools.isNull())
multiplexed_connections = std::make_unique<MultiplexedConnections>(*pools, multiplexed_connections_settings, throttler,
append_extra_info, do_broadcast);
else
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
void RemoteBlockInputStream::init(const Settings * settings_)
{
if (settings_)
{
send_settings = true;
settings = *settings_;
}
else
send_settings = false;
}
void RemoteBlockInputStream::sendQuery()
{
createMultiplexedConnections();
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;
established = true;
multiplexed_connections->sendQuery(query, "", stage, true);
established = false;
sent_query = true;
sendExternalTables();
}
void RemoteBlockInputStream::tryCancel(const char * reason)
{
bool old_val = false;
if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
LOG_TRACE(log, "(" << multiplexed_connections->dumpAddresses() << ") " << reason);
multiplexed_connections->sendCancel();
}
bool RemoteBlockInputStream::isQueryPending() const
{
return sent_query && !finished;
}
bool RemoteBlockInputStream::hasThrownException() const
{
return got_exception_from_replica || got_unknown_packet_from_replica;
}
}

View File

@ -69,7 +69,7 @@ DataTypePtr DataTypeFactory::get(const String & name) const
DataTypes argument_types;
Array params_row;
ParserExpressionList args_parser;
ParserExpressionList args_parser(false);
ASTPtr args_ast = parseQuery(args_parser, parameters.data(), parameters.data() + parameters.size(), "parameters for data type " + name);
ASTExpressionList & args_list = typeid_cast<ASTExpressionList &>(*args_ast);
@ -136,7 +136,7 @@ DataTypePtr DataTypeFactory::get(const String & name) const
if (base_name == "Tuple")
{
ParserExpressionList columns_p;
ParserExpressionList columns_p(false);
ASTPtr columns_ast = parseQuery(columns_p, parameters.data(), parameters.data() + parameters.size(), "parameters for data type " + name);
DataTypes elems;

View File

@ -36,6 +36,18 @@ void registerFunctionsDictionaries(FunctionFactory & factory)
factory.registerFunction<FunctionDictGetString>();
factory.registerFunction<FunctionDictGetHierarchy>();
factory.registerFunction<FunctionDictIsIn>();
factory.registerFunction<FunctionDictGetUInt8OrDefault>();
factory.registerFunction<FunctionDictGetUInt16OrDefault>();
factory.registerFunction<FunctionDictGetUInt32OrDefault>();
factory.registerFunction<FunctionDictGetUInt64OrDefault>();
factory.registerFunction<FunctionDictGetInt8OrDefault>();
factory.registerFunction<FunctionDictGetInt16OrDefault>();
factory.registerFunction<FunctionDictGetInt32OrDefault>();
factory.registerFunction<FunctionDictGetInt64OrDefault>();
factory.registerFunction<FunctionDictGetFloat32OrDefault>();
factory.registerFunction<FunctionDictGetFloat64OrDefault>();
factory.registerFunction<FunctionDictGetDateOrDefault>();
factory.registerFunction<FunctionDictGetDateTimeOrDefault>();
factory.registerFunction<FunctionDictGetStringOrDefault>();
}

View File

@ -929,7 +929,7 @@ Block Aggregator::prepareBlockAndFill(
}
BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const
BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const
{
size_t rows = 1;
@ -957,7 +957,8 @@ BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & d
};
Block block = prepareBlockAndFill(data_variants, final, rows, filler);
if (overflow_row)
if (is_overflows)
block.info.is_overflows = true;
BlocksList blocks;
@ -1143,7 +1144,8 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
return BlocksList();
if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row)
blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(data_variants, final));
blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(
data_variants, final, data_variants.type != AggregatedDataVariants::Type::without_key));
if (isCancelled())
return BlocksList();

View File

@ -125,11 +125,6 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_)
storage = context.getTable(database_name, table_name);
}
if (!storage->supportsParallelReplicas() && (settings.parallel_replicas_count > 0))
throw Exception("Storage engine " + storage->getName()
+ " does not support parallel execution on several replicas",
ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS);
table_lock = storage->lockStructure(false);
if (table_column_names.empty())
table_column_names = storage->getColumnsListNonMaterialized();

View File

@ -31,7 +31,7 @@ int main(int argc, char ** argv)
}
ASTPtr root;
ParserPtr parsers[] = {ParserPtr(new ParserSelectQuery), ParserPtr(new ParserExpressionList)};
ParserPtr parsers[] = {ParserPtr(new ParserSelectQuery), ParserPtr(new ParserExpressionList(false))};
for (size_t i = 0; i < sizeof(parsers)/sizeof(parsers[0]); ++i)
{
IParser & parser = *parsers[i];

View File

@ -46,7 +46,7 @@ int main(int argc, char ** argv)
DB::WriteBufferFromOStream out(std::cout);
DB::BlockInputStreamPtr query_plan;
DB::executeQuery(in, out, context, query_plan);
DB::executeQuery(in, out, context, query_plan, {});
if (query_plan)
{

View File

@ -0,0 +1,38 @@
#include <DB/Parsers/ASTIdentifier.h>
namespace DB
{
void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
auto format_element = [&](const String & name)
{
settings.ostr << (settings.hilite ? hilite_identifier : "");
WriteBufferFromOStream wb(settings.ostr, 32);
writeProbablyBackQuotedString(name, wb);
wb.next();
settings.ostr << (settings.hilite ? hilite_none : "");
};
/// Простой или составной идентификатор?
if (children.size() > 1)
{
for (size_t i = 0, size = children.size(); i < size; ++i)
{
if (i != 0)
settings.ostr << '.';
format_element(static_cast<const ASTIdentifier &>(*children[i].get()).name);
}
}
else
{
format_element(name);
}
}
}

View File

@ -18,6 +18,7 @@
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/formatAST.h>
namespace DB
@ -29,7 +30,7 @@ bool ParserArray::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_
Pos begin = pos;
ASTPtr contents_node;
ParserString open("["), close("]");
ParserExpressionList contents;
ParserExpressionList contents(false);
ParserWhiteSpaceOrComments ws;
if (!open.ignore(pos, end, max_parsed_pos, expected))
@ -58,7 +59,7 @@ bool ParserParenthesisExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, P
Pos begin = pos;
ASTPtr contents_node;
ParserString open("("), close(")");
ParserExpressionList contents;
ParserExpressionList contents(false);
ParserWhiteSpaceOrComments ws;
if (!open.ignore(pos, end, max_parsed_pos, expected))
@ -165,44 +166,27 @@ bool ParserCompoundIdentifier::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos
{
Pos begin = pos;
/// Идентификатор в обратных кавычках
if (pos != end && *pos == '`')
{
ReadBuffer buf(const_cast<char *>(pos), end - pos, 0);
String s;
readBackQuotedString(s, buf);
pos += buf.count();
node = new ASTIdentifier(StringRange(begin, pos), s);
return true;
}
else
{
while (pos != end)
{
while (pos != end
&& ((*pos >= 'a' && *pos <= 'z')
|| (*pos >= 'A' && *pos <= 'Z')
|| (*pos == '_')
|| (pos != begin && *pos >= '0' && *pos <= '9')))
++pos;
ASTPtr id_list;
if (!ParserList(ParserPtr(new ParserIdentifier), ParserPtr(new ParserString(".")), false)
.parse(pos, end, id_list, max_parsed_pos, expected))
return false;
/// Если следующий символ - точка '.' и за ней следует, не цифра,
/// то продолжаем парсинг имени идентификатора
if (pos != begin && pos + 1 < end && *pos == '.' &&
!(*(pos + 1) >= '0' && *(pos + 1) <= '9'))
++pos;
else
break;
}
if (pos != begin)
{
node = new ASTIdentifier(StringRange(begin, pos), String(begin, pos - begin));
return true;
}
else
return false;
String name;
const ASTExpressionList & list = static_cast<const ASTExpressionList &>(*id_list.get());
for (const auto & child : list.children)
{
if (!name.empty())
name += '.';
name += static_cast<const ASTIdentifier &>(*child.get()).name;
}
node = new ASTIdentifier(StringRange(begin, pos), name);
/// В children запомним идентификаторы-составляющие, если их больше одного.
if (list.children.size() > 1)
node->children.insert(node->children.end(), list.children.begin(), list.children.end());
return true;
}
@ -212,7 +196,7 @@ bool ParserFunction::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pars
ParserIdentifier id_parser;
ParserString open("("), close(")");
ParserExpressionList contents;
ParserExpressionList contents(false);
ParserWhiteSpaceOrComments ws;
ASTPtr identifier;
@ -357,6 +341,29 @@ bool ParserNumber::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed
}
bool ParserUnsignedInteger::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
Field res;
Pos begin = pos;
if (pos == end)
return false;
UInt64 x = 0;
ReadBuffer in(const_cast<char *>(pos), end - pos, 0);
if (!tryReadIntText(x, in) || in.offset() == 0)
{
expected = "unsigned integer";
return false;
}
res = x;
pos += in.offset();
node = new ASTLiteral(StringRange(begin, pos), res);
return true;
}
bool ParserStringLiteral::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
Pos begin = pos;
@ -473,18 +480,48 @@ bool ParserLiteral::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parse
if (str_p.parse(pos, end, node, max_parsed_pos, expected))
return true;
expected = "literal: one of nullptr, number, single quoted string";
expected = "literal: one of NULL, number, single quoted string";
return false;
}
const char * ParserAlias::restricted_keywords[] =
{
"FROM",
"FINAL",
"SAMPLE",
"ARRAY",
"LEFT",
"RIGHT",
"INNER",
"FULL",
"CROSS",
"JOIN",
"ANY",
"ALL",
"ON",
"USING",
"PREWHERE",
"WHERE",
"GROUP",
"WITH",
"HAVING",
"ORDER",
"LIMIT",
"SETTINGS",
"FORMAT",
"UNION",
nullptr
};
bool ParserAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserWhiteSpaceOrComments ws;
ParserString s_as("AS", true, true);
ParserIdentifier id_p;
if (!s_as.parse(pos, end, node, max_parsed_pos, expected))
bool has_as_word = s_as.parse(pos, end, node, max_parsed_pos, expected);
if (!allow_alias_without_as_keyword && !has_as_word)
return false;
ws.ignore(pos, end);
@ -492,6 +529,20 @@ bool ParserAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_
if (!id_p.parse(pos, end, node, max_parsed_pos, expected))
return false;
if (!has_as_word)
{
/** В этом случае алиас не может совпадать с ключевым словом - для того,
* чтобы в запросе "SELECT x FROM t", слово FROM не считалось алиасом,
* а в запросе "SELECT x FRO FROM t", слово FRO считалось алиасом.
*/
const String & name = static_cast<const ASTIdentifier &>(*node.get()).name;
for (const char ** keyword = restricted_keywords; *keyword != nullptr; ++keyword)
if (0 == strcasecmp(name.data(), *keyword))
return false;
}
return true;
}
@ -544,7 +595,7 @@ bool ParserExpressionElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos &
bool ParserWithOptionalAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserWhiteSpaceOrComments ws;
ParserAlias alias_p;
ParserAlias alias_p(allow_alias_without_as_keyword);
if (!elem_parser->parse(pos, end, node, max_parsed_pos, expected))
return false;
@ -574,7 +625,7 @@ bool ParserOrderByElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & ma
Pos begin = pos;
ParserWhiteSpaceOrComments ws;
ParserExpressionWithOptionalAlias elem_p;
ParserExpressionWithOptionalAlias elem_p(false);
ParserString ascending("ASCENDING", true, true);
ParserString descending("DESCENDING", true, true);
ParserString asc("ASC", true, true);

View File

@ -58,10 +58,15 @@ const char * ParserLogicalNotExpression::operators[] =
nullptr
};
const char * ParserAccessExpression::operators[] =
const char * ParserArrayElementExpression::operators[] =
{
"[", "arrayElement",
nullptr
};
const char * ParserTupleElementExpression::operators[] =
{
".", "tupleElement",
"[", "arrayElement",
nullptr
};
@ -460,25 +465,35 @@ bool ParserUnaryMinusExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Po
}
bool ParserAccessExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected &expected)
bool ParserArrayElementExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected &expected)
{
return ParserLeftAssociativeBinaryOperatorList{
operators,
ParserPtr(new ParserExpressionElement),
ParserPtr(new ParserExpressionWithOptionalAlias)
ParserPtr(new ParserExpressionWithOptionalAlias(false))
}.parse(pos, end, node, max_parsed_pos, expected);
}
ParserExpressionWithOptionalAlias::ParserExpressionWithOptionalAlias()
: impl(new ParserWithOptionalAlias(ParserPtr(new ParserLambdaExpression)))
bool ParserTupleElementExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected &expected)
{
return ParserLeftAssociativeBinaryOperatorList{
operators,
ParserPtr(new ParserArrayElementExpression),
ParserPtr(new ParserUnsignedInteger)
}.parse(pos, end, node, max_parsed_pos, expected);
}
ParserExpressionWithOptionalAlias::ParserExpressionWithOptionalAlias(bool allow_alias_without_as_keyword)
: impl(new ParserWithOptionalAlias(ParserPtr(new ParserLambdaExpression), allow_alias_without_as_keyword))
{
}
bool ParserExpressionList::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
return ParserList(ParserPtr(new ParserExpressionWithOptionalAlias), ParserPtr(new ParserString(","))).parse(pos, end, node, max_parsed_pos, expected);
return ParserList(ParserPtr(new ParserExpressionWithOptionalAlias(allow_alias_without_as_keyword)), ParserPtr(new ParserString(","))).parse(pos, end, node, max_parsed_pos, expected);
}

View File

@ -28,9 +28,11 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p
ParserString s_outer("OUTER", true, true);
ParserString s_join("JOIN", true, true);
ParserString s_using("USING", true, true);
ParserString s_on("ON", true, true);
ParserNotEmptyExpressionList exp_list;
ParserWithOptionalAlias subquery(ParserPtr(new ParserSubquery));
ParserNotEmptyExpressionList exp_list(false);
ParserLogicalOrExpression exp_elem;
ParserWithOptionalAlias subquery(ParserPtr(new ParserSubquery), true);
ParserIdentifier identifier;
ws.ignore(pos, end);
@ -93,15 +95,41 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p
if (join->kind != ASTJoin::Cross)
{
if (!s_using.ignore(pos, end, max_parsed_pos, expected))
if (s_using.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
/// Выражение для USING можно указать как в скобках, так и без них.
bool in_parens = ParserString("(").ignore(pos, end);
if (in_parens)
ws.ignore(pos, end);
if (!exp_list.parse(pos, end, join->using_expr_list, max_parsed_pos, expected))
return false;
if (in_parens)
{
ws.ignore(pos, end);
if (!ParserString(")").ignore(pos, end))
return false;
}
ws.ignore(pos, end);
}
else if (s_on.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
if (!exp_elem.parse(pos, end, join->on_expr, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
else
{
expected = "USING or ON";
return false;
ws.ignore(pos, end);
if (!exp_list.parse(pos, end, join->using_expr_list, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
}
join->children.push_back(join->table);

View File

@ -41,8 +41,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ParserString s_union("UNION", true, true);
ParserString s_all("ALL", true, true);
ParserNotEmptyExpressionList exp_list;
ParserExpressionWithOptionalAlias exp_elem;
ParserNotEmptyExpressionList exp_list(false);
ParserNotEmptyExpressionList exp_list_for_select_clause(true); /// Разрешает алиасы без слова AS.
ParserExpressionWithOptionalAlias exp_elem(false);
ParserJoin join;
ParserOrderByExpressionList order_list;
@ -61,7 +62,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
}
if (!exp_list.parse(pos, end, select_query->select_expression_list, max_parsed_pos, expected))
if (!exp_list_for_select_clause.parse(pos, end, select_query->select_expression_list, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
@ -112,6 +113,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
if (s_dot.ignore(pos, end, max_parsed_pos, expected))
{
select_query->database = select_query->table;
ws.ignore(pos, end);
if (!ident.parse(pos, end, select_query->table, max_parsed_pos, expected))
return false;
@ -127,7 +131,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
return false;
/// Может быть указан алиас. На данный момент, он ничего не значит и не используется.
ParserAlias().ignore(pos, end);
ParserAlias(true).ignore(pos, end);
ws.ignore(pos, end);
}

View File

@ -104,6 +104,7 @@ void ReplicatedMergeTreeRestartingThread::run()
time_t new_absolute_delay = 0;
time_t new_relative_delay = 0;
/// TODO Ловить здесь исключение.
checkReplicationDelays(new_absolute_delay, new_relative_delay);
absolute_delay.store(new_absolute_delay, std::memory_order_relaxed);
@ -169,10 +170,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.shutdown_called = false;
storage.shutdown_event.reset();
storage.merger.uncancelAll();
if (storage.unreplicated_merger)
storage.unreplicated_merger->uncancelAll();
storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage);
storage.cleanup_thread.reset(new ReplicatedMergeTreeCleanupThread(storage));
storage.alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, &storage);
@ -181,6 +178,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
std::bind(&StorageReplicatedMergeTree::queueTask, &storage, std::placeholders::_1));
storage.queue_task_handle->wake();
storage.merger.uncancel();
if (storage.unreplicated_merger)
storage.unreplicated_merger->uncancel();
return true;
}
catch (...)
@ -328,9 +329,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.parts_to_check_event.set();
storage.replica_is_active_node = nullptr;
storage.merger.cancelAll();
storage.merger.cancel();
if (storage.unreplicated_merger)
storage.unreplicated_merger->cancelAll();
storage.unreplicated_merger->cancel();
LOG_TRACE(log, "Waiting for threads to finish");
if (storage.is_leader_node)
@ -366,140 +367,14 @@ void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently()
}
static time_t extractTimeOfLogEntryIfGetPart(zkutil::ZooKeeperPtr & zookeeper, const String & name, const String & path)
{
String content;
zkutil::Stat stat;
if (!zookeeper->tryGet(path + "/" + name, content, &stat))
return 0; /// Узел уже успел удалиться.
ReplicatedMergeTreeLogEntry entry;
entry.parse(content, stat);
if (entry.type == ReplicatedMergeTreeLogEntry::GET_PART)
return entry.create_time;
return 0;
}
/// В массиве имён узлов - элементов очереди/лога находит первый, имеющий тип GET_PART и возвращает его время; либо ноль, если не нашёл.
static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Strings & nodes, const String & path)
{
for (const auto & name : nodes)
{
time_t res = extractTimeOfLogEntryIfGetPart(zookeeper, name, path);
if (res)
return res;
}
return 0;
}
void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
{
/** Нужно получить следующую информацию:
* 1. Время последней записи типа GET в логе.
* 2. Время первой записи типа GET в очереди каждой активной реплики
* (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики).
*
* Разница между этими величинами называется (абсолютным) отставанием реплик.
* Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием.
*
* Если относительное отставание текущей реплики больше некоторого порога,
* и текущая реплика является лидером, то текущая реплика должна уступить лидерство.
*
* Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо:
* - не отвечать Ok на некоторую ручку проверки реплик для балансировщика;
* - не принимать соединения для обработки запросов.
* Это делается в других местах.
*
* NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов.
* Могут быть проблемы в случае разрастания лога до большого размера.
*/
out_absolute_delay = 0;
out_relative_delay = 0;
auto zookeeper = storage.getZooKeeper();
/// Последняя запись GET в логе.
String log_path = storage.zookeeper_path + "/log";
Strings log_entries_desc = zookeeper->getChildren(log_path);
std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater<String>());
time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path);
/** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт.
* В очередях у реплик могут быть не выполненные старые записи типа GET,
* которые туда добавлены не из лога, а для восстановления битых кусков.
* Не будем считать это отставанием.
*/
if (!last_entry_to_get_part)
return;
/// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет.
std::map<String, time_t> replicas_first_entry_to_get_part;
Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election");
for (const auto & node : active_replicas)
{
String replica;
if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica))
continue; /// Реплика только что перестала быть активной.
String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue";
Strings queue_entries = zookeeper->getChildren(queue_path);
std::sort(queue_entries.begin(), queue_entries.end());
time_t & first_time = replicas_first_entry_to_get_part[replica];
first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path);
if (!first_time)
{
/// Ищем среди записей лога после log_pointer для реплики.
String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
String log_min_entry = "log-" + storage.padIndex(parse<UInt64>(log_pointer));
for (const auto & name : log_entries_desc)
{
if (name < log_min_entry)
break;
first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path);
if (first_time)
break;
}
}
}
if (active_replicas.empty())
{
/// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить?
/// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально.
LOG_ERROR(log, "No active replicas when checking replication delays: very strange.");
return;
}
time_t first_entry_of_most_recent_replica = -1;
for (const auto & replica_time : replicas_first_entry_to_get_part)
{
if (0 == replica_time.second)
{
/// Есть реплика, которая совсем не отстаёт.
first_entry_of_most_recent_replica = 0;
break;
}
if (replica_time.second > first_entry_of_most_recent_replica)
first_entry_of_most_recent_replica = replica_time.second;
}
time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name];
if (0 == our_first_entry_to_get_part)
return; /// Если мы совсем не отстаём.
out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part;
out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part;
// TODO
LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << ".");
}

View File

@ -4,6 +4,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Storages/StorageBuffer.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Common/setThreadName.h>
#include <Poco/Ext/ThreadNumber.h>

View File

@ -9,6 +9,7 @@
#include <DB/Common/escapeForFileName.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
@ -192,7 +193,33 @@ BlockInputStreams StorageDistributed::read(
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
external_tables = context.getExternalTables();
/// Распределить шарды равномерно по потокам.
size_t remote_count = cluster.getRemoteShardCount();
/// Отключаем мультиплексирование шардов, если есть ORDER BY без GROUP BY.
const ASTSelectQuery & ast = *(static_cast<const ASTSelectQuery *>(modified_query_ast.get()));
bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list);
size_t thread_count;
if (!enable_shard_multiplexing)
thread_count = remote_count;
else if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
ConnectionPoolsPtr pools;
bool do_init = true;
/// Цикл по шардам.
size_t current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
if (shard_info.isLocal())
@ -207,17 +234,42 @@ BlockInputStreams StorageDistributed::read(
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage);
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in));
}
}
else
{
res.emplace_back(new RemoteBlockInputStream{
shard_info.pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
res.emplace_back(new RemoteBlockInputStream{
shard_info.pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
++current_thread;
}
else
{
if (do_init)
{
pools = new ConnectionPools;
do_init = false;
}
pools->push_back(shard_info.pool);
if (pools->size() == actual_pools_per_thread)
{
res.emplace_back(new RemoteBlockInputStream{
pools, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
do_init = true;
++current_thread;
}
}
}
}
@ -278,7 +330,32 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
BlockInputStreams res;
/// Распределить шарды равномерно по потокам.
size_t remote_count = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
if (shard_info.hasRemoteConnections())
++remote_count;
}
size_t thread_count;
if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
ConnectionPoolsPtr pools;
bool do_init = true;
/// Цикл по шардам.
size_t current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
if (shard_info.isLocal())
@ -299,10 +376,37 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
if (shard_info.hasRemoteConnections())
{
auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler};
stream->reachAllReplicas();
stream->appendExtraInfo();
res.emplace_back(stream);
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler};
stream->doBroadcast();
stream->appendExtraInfo();
res.emplace_back(stream);
++current_thread;
}
else
{
if (do_init)
{
pools = new ConnectionPools;
do_init = false;
}
pools->push_back(shard_info.pool);
if (pools->size() == actual_pools_per_thread)
{
auto stream = new RemoteBlockInputStream{pools, query, &new_settings, throttler};
stream->doBroadcast();
stream->appendExtraInfo();
res.emplace_back(stream);
do_init = true;
++current_thread;
}
}
}
}

View File

@ -100,7 +100,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelAll();
merger.cancel();
background_pool.removeTask(merge_task_handle);
}

View File

@ -162,6 +162,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
checkParts(skip_sanity_checks);
}
createNewZooKeeperNodes();
initVirtualParts();
String unreplicated_path = full_path + "unreplicated/";
@ -194,6 +196,20 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto zookeeper = getZooKeeper();
/// Работа с кворумом.
zookeeper->createIfNotExists(zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", "");
/// Отслеживание отставания реплик.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", "");
}
StoragePtr StorageReplicatedMergeTree::create(
const String & zookeeper_path_,
const String & replica_name_,
@ -1801,139 +1817,153 @@ void StorageReplicatedMergeTree::alterThread()
bool changed_version = (stat.version != columns_version);
MergeTreeData::DataParts parts;
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed_version)
{
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
/// Если потребуется блокировать структуру таблицы, то приостановим мерджи.
std::unique_ptr<MergeTreeMergeBlocker> merge_blocker;
std::unique_ptr<MergeTreeMergeBlocker> unreplicated_merge_blocker;
auto table_lock = lockStructureForAlter();
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
const auto alias_columns_changed = alias_columns != data.alias_columns;
const auto column_defaults_changed = column_defaults != data.column_defaults;
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
column_defaults_changed)
if (changed_version || force_recheck_parts)
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults, context);
if (columns_changed)
{
data.setColumnsList(columns);
if (unreplicated_data)
unreplicated_data->setColumnsList(columns);
}
if (materialized_columns_changed)
{
this->materialized_columns = materialized_columns;
data.materialized_columns = std::move(materialized_columns);
}
if (alias_columns_changed)
{
this->alias_columns = alias_columns;
data.alias_columns = std::move(alias_columns);
}
if (column_defaults_changed)
{
this->column_defaults = column_defaults;
data.column_defaults = std::move(column_defaults);
}
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
merge_blocker = std::make_unique<MergeTreeMergeBlocker>(merger);
if (unreplicated_merger)
unreplicated_merge_blocker = std::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger);
}
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
columns_version = stat.version;
}
/// Обновим куски.
if (changed_version || force_recheck_parts)
{
auto table_lock = lockStructure(false);
MergeTreeData::DataParts parts;
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
{
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
int changed_parts = 0;
auto table_lock = lockStructureForAlter();
if (!changed_version)
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
const auto alias_columns_changed = alias_columns != data.alias_columns;
const auto column_defaults_changed = column_defaults != data.column_defaults;
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
column_defaults_changed)
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults, context);
if (columns_changed)
{
data.setColumnsList(columns);
if (unreplicated_data)
unreplicated_data->setColumnsList(columns);
}
if (materialized_columns_changed)
{
this->materialized_columns = materialized_columns;
data.materialized_columns = std::move(materialized_columns);
}
if (alias_columns_changed)
{
this->alias_columns = alias_columns;
data.alias_columns = std::move(alias_columns);
}
if (column_defaults_changed)
{
this->column_defaults = column_defaults;
data.column_defaults = std::move(column_defaults);
}
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
const auto columns_plus_materialized = data.getColumnsList();
for (const MergeTreeData::DataPartPtr & part : parts)
{
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = data.alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
/// Обновим метаданные куска в ZooKeeper.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
zookeeper->multi(ops);
/// Применим изменения файлов.
transaction->commit();
columns_version = stat.version;
}
/// То же самое для нереплицируемых данных.
if (unreplicated_data)
/// Обновим куски.
if (changed_version || force_recheck_parts)
{
parts = unreplicated_data->getDataParts();
auto table_lock = lockStructure(false);
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
int changed_parts = 0;
if (!changed_version)
parts = data.getDataParts();
const auto columns_plus_materialized = data.getColumnsList();
for (const MergeTreeData::DataPartPtr & part : parts)
{
auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized);
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = data.alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
/// Обновим метаданные куска в ZooKeeper.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
zookeeper->multi(ops);
/// Применим изменения файлов.
transaction->commit();
}
/// То же самое для нереплицируемых данных.
if (unreplicated_data)
{
parts = unreplicated_data->getDataParts();
for (const MergeTreeData::DataPartPtr & part : parts)
{
auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
transaction->commit();
}
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns_str);
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
force_recheck_parts = false;
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns_str);
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
force_recheck_parts = false;
/// Важно, что уничтожается parts и merge_blocker перед wait-ом.
}
parts.clear();
alter_thread_event->wait();
}
catch (...)

View File

@ -39,7 +39,7 @@ int main(int argc, char ** argv)
const char * begin = primary_expr_str.data();
const char * end = begin + primary_expr_str.size();
const char * max_parsed_pos = begin;
ParserExpressionList parser;
ParserExpressionList parser(false);
if (!parser.parse(begin, end, primary_expr, max_parsed_pos, expected))
throw Poco::Exception("Cannot parse " + primary_expr_str);

View File

@ -0,0 +1 @@
SELECT 1+-a[1].2*2 = -245 ? 'Ok' : 'Fail' AS res FROM (SELECT [('Hello', 123)] AS a);

View File

@ -0,0 +1,2 @@
SELECT 1 x FROM system.one;
SELECT 1 + (2 AS x) y FROM system.one;

View File

@ -0,0 +1,2 @@
SELECT * FROM system . one;
SELECT * FROM system /* Hello */. `one`;