Merge branch 'master' of github.com:yandex/ClickHouse into async-reads

This commit is contained in:
Alexey Milovidov 2021-08-28 01:19:16 +03:00
commit 79e0433ba7
60 changed files with 766 additions and 316 deletions

View File

@ -171,4 +171,4 @@ default
!!! warning "Внимание" !!! warning "Внимание"
Прореживание данных производится во время слияний. Обычно для старых партций слияния не запускаются, поэтому для прореживания надо иницировать незапланированное слияние используя [optimize](../../../sql-reference/statements/optimize.md). Или использовать дополнительные инструменты, например [graphite-ch-optimizer](https://github.com/innogames/graphite-ch-optimizer). Прореживание данных производится во время слияний. Обычно для старых партиций слияния не запускаются, поэтому для прореживания надо инициировать незапланированное слияние используя [optimize](../../../sql-reference/statements/optimize.md). Или использовать дополнительные инструменты, например [graphite-ch-optimizer](https://github.com/innogames/graphite-ch-optimizer).

View File

@ -520,7 +520,7 @@ WHERE (CounterID = 912887) AND (toYYYYMM(StartDate) = 201403) AND (domain(StartU
ClickHouse集群是一个同质集群。 设置步骤: ClickHouse集群是一个同质集群。 设置步骤:
1. 在群集的所有机器上安装ClickHouse服务端 1. 在群集的所有机器上安装ClickHouse服务端
2. 在配置文件中设置集配置 2. 在配置文件中设置集配置
3. 在每个实例上创建本地表 3. 在每个实例上创建本地表
4. 创建一个[分布式表](../engines/table-engines/special/distributed.md) 4. 创建一个[分布式表](../engines/table-engines/special/distributed.md)

View File

@ -271,6 +271,9 @@ try
/// Load global settings from default_profile and system_profile. /// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config()); global_context->setDefaultProfiles(config());
/// We load temporary database first, because projections need it.
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
/** Init dummy default DB /** Init dummy default DB
* NOTE: We force using isolated default database to avoid conflicts with default database from server environment * NOTE: We force using isolated default database to avoid conflicts with default database from server environment
* Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory; * Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory;

View File

@ -1116,15 +1116,15 @@ if (ThreadFuzzer::instance().isEffective())
try try
{ {
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context); loadMetadataSystem(global_context);
/// After attaching system databases we can initialize system log. /// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs(); global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
auto & database_catalog = DatabaseCatalog::instance();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log) /// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper); attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
/// Then, load remaining databases /// Then, load remaining databases
loadMetadata(global_context, default_database); loadMetadata(global_context, default_database);
database_catalog.loadDatabases(); database_catalog.loadDatabases();

View File

@ -301,7 +301,7 @@ size_t ColumnUnique<ColumnType>::getNullValueIndex() const
template <typename ColumnType> template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x) size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
{ {
if (x.getType() == Field::Types::Null) if (x.isNull())
return getNullValueIndex(); return getNullValueIndex();
if (valuesHaveFixedSize()) if (valuesHaveFixedSize())

View File

@ -577,16 +577,15 @@
M(606, BACKUP_IS_EMPTY) \ M(606, BACKUP_IS_EMPTY) \
M(607, BACKUP_ELEMENT_DUPLICATE) \ M(607, BACKUP_ELEMENT_DUPLICATE) \
M(608, CANNOT_RESTORE_TABLE) \ M(608, CANNOT_RESTORE_TABLE) \
M(609, CANNOT_ADVISE) \ M(609, FUNCTION_ALREADY_EXISTS) \
M(610, UNKNOWN_READ_METHOD) \ M(610, CANNOT_DROP_SYSTEM_FUNCTION) \
M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \
M(614, POSTGRESQL_CONNECTION_FAILURE) \
M(615, CANNOT_ADVISE) \
M(616, UNKNOWN_READ_METHOD) \
\ \
M(598, FUNCTION_ALREADY_EXISTS) \
M(599, CANNOT_DROP_SYSTEM_FUNCTION) \
M(600, CANNOT_CREATE_RECURSIVE_FUNCTION) \
M(601, OBJECT_ALREADY_STORED_ON_DISK) \
M(602, OBJECT_WAS_NOT_STORED_ON_DISK) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \ M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \ M(1001, STD_EXCEPTION) \

View File

@ -64,6 +64,7 @@ size_t TLDListsHolder::parseAndAddTldList(const std::string & name, const std::s
while (!in.eof()) while (!in.eof())
{ {
readEscapedStringUntilEOL(line, in); readEscapedStringUntilEOL(line, in);
if (!in.eof())
++in.position(); ++in.position();
/// Skip comments /// Skip comments
if (line.size() > 2 && line[0] == '/' && line[1] == '/') if (line.size() > 2 && line[0] == '/' && line[1] == '/')

View File

@ -387,6 +387,7 @@ void ZooKeeper::connect(
} }
socket.connect(node.address, connection_timeout); socket.connect(node.address, connection_timeout);
socket_address = socket.peerAddress();
socket.setReceiveTimeout(operation_timeout); socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout); socket.setSendTimeout(operation_timeout);
@ -1255,7 +1256,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
{ {
elem.type = log_type; elem.type = log_type;
elem.event_time = event_time; elem.event_time = event_time;
elem.address = socket.peerAddress(); elem.address = socket_address;
elem.session_id = session_id; elem.session_id = session_id;
maybe_zk_log->add(elem); maybe_zk_log->add(elem);
} }

View File

@ -199,6 +199,8 @@ private:
Poco::Timespan operation_timeout; Poco::Timespan operation_timeout;
Poco::Net::StreamSocket socket; Poco::Net::StreamSocket socket;
/// To avoid excessive getpeername(2) calls.
Poco::Net::SocketAddress socket_address;
std::optional<ReadBufferFromPocoSocket> in; std::optional<ReadBufferFromPocoSocket> in;
std::optional<WriteBufferFromPocoSocket> out; std::optional<WriteBufferFromPocoSocket> out;

View File

@ -35,6 +35,7 @@ bool pathStartsWith(const std::filesystem::path & path, const std::filesystem::p
/// Returns true if path starts with prefix path /// Returns true if path starts with prefix path
bool pathStartsWith(const String & path, const String & prefix_path); bool pathStartsWith(const String & path, const String & prefix_path);
/// Returns true if symlink starts with prefix path
bool symlinkStartsWith(const String & path, const String & prefix_path); bool symlinkStartsWith(const String & path, const String & prefix_path);
} }

View File

@ -284,16 +284,17 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
/// Amount of entries in last log index /// Amount of entries in last log index
uint64_t entries_in_last = 0; uint64_t entries_in_last = 0;
/// Log idx of the first incomplete log (key in existing_changelogs) /// Log idx of the first incomplete log (key in existing_changelogs)
uint64_t first_incomplete_log_start_index = 0; int64_t first_incomplete_log_start_index = -1; /// if -1 then no incomplete log exists
ChangelogReadResult result{}; ChangelogReadResult result{};
/// First log index which was read from all changelogs /// First log index which was read from all changelogs
uint64_t first_read_index = 0; uint64_t first_read_index = 0;
/// We must start to read from this log index
uint64_t start_to_read_from = last_commited_log_index; uint64_t start_to_read_from = last_commited_log_index;
/// If we need to have some reserved log read additional `logs_to_keep` logs
if (start_to_read_from > logs_to_keep) if (start_to_read_from > logs_to_keep)
start_to_read_from -= logs_to_keep; start_to_read_from -= logs_to_keep;
else else
@ -302,14 +303,20 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
/// At least we read something /// At least we read something
bool started = false; bool started = false;
/// Got through changelog files in order of start_index
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs) for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{ {
entries_in_last = changelog_description.to_log_index - changelog_description.from_log_index + 1; /// How many entries we have in the last changelog
entries_in_last = changelog_description.expectedEntriesCountInLog();
/// [from_log_index.>=.......start_to_read_from.....<=.to_log_index]
if (changelog_description.to_log_index >= start_to_read_from) if (changelog_description.to_log_index >= start_to_read_from)
{ {
if (!started) if (!started) /// still nothing was read
{ {
/// Our first log starts from the more fresh log_id than we required to read and this changelog is not empty log.
/// So we are missing something in our logs, but it's not dataloss, we will receive snapshot and required
/// entries from leader.
if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1) if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1)
{ {
LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index); LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index);
@ -317,13 +324,19 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
break; break;
} }
else if (changelog_description.from_log_index > start_to_read_from) else if (changelog_description.from_log_index > start_to_read_from)
{
/// We don't have required amount of reserved logs, but nothing was lost.
LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smallest available log index on disk {}.", start_to_read_from, changelog_description.from_log_index); LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smallest available log index on disk {}.", start_to_read_from, changelog_description.from_log_index);
} }
}
started = true;
ChangelogReader reader(changelog_description.path); ChangelogReader reader(changelog_description.path);
result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log); result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log);
started = true;
/// Otherwise we have already initialized it
if (first_read_index == 0) if (first_read_index == 0)
first_read_index = result.first_read_index; first_read_index = result.first_read_index;
@ -340,12 +353,12 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
if (first_read_index != 0) if (first_read_index != 0)
start_index = first_read_index; start_index = first_read_index;
else else /// We just may have no logs (only snapshot)
start_index = last_commited_log_index; start_index = last_commited_log_index;
/// Found some broken or non finished logs /// Found some broken or non finished logs
/// We have to remove broken data and continue to write into incomplete log. /// We have to remove broken data and continue to write into incomplete log.
if (first_incomplete_log_start_index != 0) if (first_incomplete_log_start_index != -1) /// otherwise all logs completed so just start a new one
{ {
auto start_remove_from = existing_changelogs.begin(); auto start_remove_from = existing_changelogs.begin();
if (started) if (started)
@ -363,6 +376,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
if (!existing_changelogs.empty()) if (!existing_changelogs.empty())
{ {
auto description = existing_changelogs.rbegin()->second; auto description = existing_changelogs.rbegin()->second;
if (description.expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description.expectedEntriesCountInLog());
LOG_TRACE(log, "Continue to write into {}", description.path); LOG_TRACE(log, "Continue to write into {}", description.path);
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index); current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
current_writer->setEntriesWritten(result.entries_read); current_writer->setEntriesWritten(result.entries_read);
@ -425,10 +441,13 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
if (logs.empty()) if (logs.empty())
start_index = index; start_index = index;
if (current_writer->getEntriesWritten() == rotate_interval) const auto & current_changelog_description = existing_changelogs[current_writer->getStartIndex()];
const bool log_is_complete = current_writer->getEntriesWritten() == current_changelog_description.expectedEntriesCountInLog();
if (log_is_complete)
rotate(index); rotate(index);
auto offset = current_writer->appendRecord(buildRecord(index, log_entry)); const auto offset = current_writer->appendRecord(buildRecord(index, log_entry));
if (!index_to_start_pos.try_emplace(index, offset).second) if (!index_to_start_pos.try_emplace(index, offset).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index); throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index);
@ -440,28 +459,31 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
if (index_to_start_pos.count(index) == 0) if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
/// Complex case when we need to override data from already rotated log /// This write_at require to overwrite everything in this file and also in previous file(s)
bool go_to_previous_file = index < current_writer->getStartIndex(); const bool go_to_previous_file = index < current_writer->getStartIndex();
if (go_to_previous_file) if (go_to_previous_file)
{ {
auto index_changelog = existing_changelogs.lower_bound(index); auto index_changelog = existing_changelogs.lower_bound(index);
ChangelogFileDescription description; ChangelogFileDescription description;
if (index_changelog->first == index)
if (index_changelog->first == index) /// exactly this file starts from index
description = index_changelog->second; description = index_changelog->second;
else else
description = std::prev(index_changelog)->second; description = std::prev(index_changelog)->second;
/// Initialize writer from this log file
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, index_changelog->first); current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, index_changelog->first);
current_writer->setEntriesWritten(description.to_log_index - description.from_log_index + 1); current_writer->setEntriesWritten(description.to_log_index - description.from_log_index + 1);
} }
auto entries_written = current_writer->getEntriesWritten(); /// Truncate current file
current_writer->truncateToLength(index_to_start_pos[index]); current_writer->truncateToLength(index_to_start_pos[index]);
if (go_to_previous_file) if (go_to_previous_file)
{ {
/// Remove all subsequent files /// Remove all subsequent files if overwritten something in previous one
auto to_remove_itr = existing_changelogs.upper_bound(index); auto to_remove_itr = existing_changelogs.upper_bound(index);
for (auto itr = to_remove_itr; itr != existing_changelogs.end();) for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
{ {
@ -470,7 +492,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
} }
} }
auto entries_written = current_writer->getEntriesWritten();
/// Remove redundant logs from memory /// Remove redundant logs from memory
/// Everything >= index must be removed
for (uint64_t i = index; ; ++i) for (uint64_t i = index; ; ++i)
{ {
auto log_itr = logs.find(i); auto log_itr = logs.find(i);
@ -481,9 +505,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
index_to_start_pos.erase(i); index_to_start_pos.erase(i);
entries_written--; entries_written--;
} }
current_writer->setEntriesWritten(entries_written); current_writer->setEntriesWritten(entries_written);
/// Now we can actually override entry at index
appendEntry(index, log_entry); appendEntry(index, log_entry);
} }
@ -511,7 +535,7 @@ LogEntryPtr Changelog::getLastEntry() const
/// This entry treaded in special way by NuRaft /// This entry treaded in special way by NuRaft
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(uint64_t))); static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(uint64_t)));
uint64_t next_index = getNextEntryIndex() - 1; const uint64_t next_index = getNextEntryIndex() - 1;
auto entry = logs.find(next_index); auto entry = logs.find(next_index);
if (entry == logs.end()) if (entry == logs.end())
return fake_entry; return fake_entry;

View File

@ -53,6 +53,12 @@ struct ChangelogFileDescription
uint64_t to_log_index; uint64_t to_log_index;
std::string path; std::string path;
/// How many entries should be stored in this log
uint64_t expectedEntriesCountInLog() const
{
return to_log_index - from_log_index + 1;
}
}; };
class ChangelogWriter; class ChangelogWriter;
@ -128,10 +134,16 @@ private:
const bool force_sync; const bool force_sync;
Poco::Logger * log; Poco::Logger * log;
/// Currently existing changelogs
std::map<uint64_t, ChangelogFileDescription> existing_changelogs; std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
/// Current writer for changelog file
std::unique_ptr<ChangelogWriter> current_writer; std::unique_ptr<ChangelogWriter> current_writer;
/// Mapping log_id -> binary offset in log file
IndexToOffset index_to_start_pos; IndexToOffset index_to_start_pos;
/// Mapping log_id -> log_entry
IndexToLogEntry logs; IndexToLogEntry logs;
/// Start log_id which exists in all "active" logs
uint64_t start_index = 0; uint64_t start_index = 0;
}; };

View File

@ -106,32 +106,27 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
{ {
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request); const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request);
int64_t session_id; int64_t session_id;
{
std::lock_guard lock(storage_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
}
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>(); std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
response->internal_id = session_id_request.internal_id; response->internal_id = session_id_request.internal_id;
response->session_id = session_id;
response->server_id = session_id_request.server_id; response->server_id = session_id_request.server_id;
KeeperStorage::ResponseForSession response_for_session; KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1; response_for_session.session_id = -1;
response_for_session.response = response; response_for_session.response = response;
{
std::lock_guard lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
responses_queue.push(response_for_session); responses_queue.push(response_for_session);
} }
}
else else
{ {
KeeperStorage::ResponsesForSessions responses_for_sessions; std::lock_guard lock(storage_and_responses_lock);
{ KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
std::lock_guard lock(storage_lock);
responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
for (auto & response_for_session : responses_for_sessions) for (auto & response_for_session : responses_for_sessions)
responses_queue.push(response_for_session); responses_queue.push(response_for_session);
} }
}
last_committed_idx = log_idx; last_committed_idx = log_idx;
return nullptr; return nullptr;
@ -150,7 +145,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
} }
{ /// deserialize and apply snapshot to storage { /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr); std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
} }
@ -175,7 +170,7 @@ void KeeperStateMachine::create_snapshot(
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task; CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy); snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
} }
@ -198,7 +193,7 @@ void KeeperStateMachine::create_snapshot(
{ {
/// Must do it with lock (clearing elements from list) /// Must do it with lock (clearing elements from list)
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
/// Turn off "snapshot mode" and clear outdate part of storage state /// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot(); storage->clearGarbageAfterSnapshot();
/// Destroy snapshot with lock /// Destroy snapshot with lock
@ -236,7 +231,7 @@ void KeeperStateMachine::save_logical_snp_obj(
nuraft::ptr<nuraft::snapshot> cloned_meta; nuraft::ptr<nuraft::snapshot> cloned_meta;
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx()); KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot); cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
} }
@ -303,24 +298,21 @@ int KeeperStateMachine::read_logical_snp_obj(
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{ {
/// Pure local request, just process it with storage /// Pure local request, just process it with storage
KeeperStorage::ResponsesForSessions responses; std::lock_guard lock(storage_and_responses_lock);
{ auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
std::lock_guard lock(storage_lock);
responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
}
for (const auto & response : responses) for (const auto & response : responses)
responses_queue.push(response); responses_queue.push(response);
} }
std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions() std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions()
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getDeadSessions(); return storage->getDeadSessions();
} }
void KeeperStateMachine::shutdownStorage() void KeeperStateMachine::shutdownStorage()
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
storage->finalize(); storage->finalize();
} }

View File

@ -99,8 +99,12 @@ private:
/// Mutex for snapshots /// Mutex for snapshots
std::mutex snapshots_lock; std::mutex snapshots_lock;
/// Lock for storage /// Lock for storage and responses_queue. It's important to process requests
std::mutex storage_lock; /// and push them to the responses queue while holding this lock. Otherwise
/// we can get strange cases when, for example client send read request with
/// watch and after that receive watch response and only receive response
/// for request.
std::mutex storage_and_responses_lock;
/// Last committed Raft log number. /// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx; std::atomic<uint64_t> last_committed_idx;

View File

@ -151,20 +151,40 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat
} }
auto parent_path = parentPath(path); auto parent_path = parentPath(path);
it = list_watches.find(parent_path);
Strings paths_to_check_for_list_watches;
if (event_type == Coordination::Event::CREATED)
{
paths_to_check_for_list_watches.push_back(parent_path); /// Trigger list watches for parent
}
else if (event_type == Coordination::Event::DELETED)
{
paths_to_check_for_list_watches.push_back(path); /// Trigger both list watches for this path
paths_to_check_for_list_watches.push_back(parent_path); /// And for parent path
}
/// CHANGED event never trigger list wathes
for (const auto & path_to_check : paths_to_check_for_list_watches)
{
it = list_watches.find(path_to_check);
if (it != list_watches.end()) if (it != list_watches.end())
{ {
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_list_response = std::make_shared<Coordination::ZooKeeperWatchResponse>(); std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_list_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_list_response->path = parent_path; watch_list_response->path = path_to_check;
watch_list_response->xid = Coordination::WATCH_XID; watch_list_response->xid = Coordination::WATCH_XID;
watch_list_response->zxid = -1; watch_list_response->zxid = -1;
if (path_to_check == parent_path)
watch_list_response->type = Coordination::Event::CHILD; watch_list_response->type = Coordination::Event::CHILD;
else
watch_list_response->type = Coordination::Event::DELETED;
watch_list_response->state = Coordination::State::CONNECTED; watch_list_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : it->second) for (auto watcher_session : it->second)
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response}); result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response});
list_watches.erase(it); list_watches.erase(it);
} }
}
return result; return result;
} }

View File

@ -1299,6 +1299,82 @@ TEST(CoordinationTest, TestEphemeralNodeRemove)
} }
TEST(CoordinationTest, TestRotateIntervalChanges)
{
using namespace Coordination;
ChangelogDirTest snapshots("./logs");
{
DB::KeeperLogStore changelog("./logs", 100, true);
changelog.init(0, 3);
for (size_t i = 1; i < 55; ++i)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
}
}
EXPECT_TRUE(fs::exists("./logs/changelog_0_99.bin"));
DB::KeeperLogStore changelog_1("./logs", 10, true);
changelog_1.init(0, 50);
for (size_t i = 0; i < 55; ++i)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(100 + i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
changelog_1.append(entry);
changelog_1.end_of_append_batch(0, 0);
}
EXPECT_TRUE(fs::exists("./logs/changelog_0_99.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_100_109.bin"));
DB::KeeperLogStore changelog_2("./logs", 7, true);
changelog_2.init(98, 55);
for (size_t i = 0; i < 17; ++i)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(200 + i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
changelog_2.append(entry);
changelog_2.end_of_append_batch(0, 0);
}
changelog_2.compact(105);
EXPECT_FALSE(fs::exists("./logs/changelog_0_99.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_100_109.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_110_116.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_117_123.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_124_130.bin"));
DB::KeeperLogStore changelog_3("./logs", 5, true);
changelog_3.init(116, 3);
for (size_t i = 0; i < 17; ++i)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(300 + i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
changelog_3.append(entry);
changelog_3.end_of_append_batch(0, 0);
}
changelog_3.compact(125);
EXPECT_FALSE(fs::exists("./logs/changelog_100_109.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_110_116.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_117_123.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_124_130.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_131_135.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_136_140.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_141_145.bin"));
}
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr)); Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));

View File

@ -98,7 +98,7 @@ class IColumn;
M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \ M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \
\ \
M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \ M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \
M(UInt64, replication_alter_columns_timeout, 60, "Wait for actions to change the table structure within the specified number of seconds. 0 - wait unlimited time.", 0) \ M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \
\ \
M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \ M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \
M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \ M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \
@ -520,6 +520,7 @@ class IColumn;
M(Bool, allow_experimental_window_functions, true, "Obsolete setting, does nothing.", 0) \ M(Bool, allow_experimental_window_functions, true, "Obsolete setting, does nothing.", 0) \
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "Obsolete setting, does nothing.", 0) \ M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "Obsolete setting, does nothing.", 0) \
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing.", 0) \ M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing.", 0) \
M(UInt64, replication_alter_columns_timeout, 60, "Obsolete setting, does nothing.", 0) \
/** The section above is for obsolete settings. Do not add anything there. */ /** The section above is for obsolete settings. Do not add anything there. */

View File

@ -32,8 +32,9 @@ FileDictionarySource::FileDictionarySource(
, sample_block{sample_block_} , sample_block{sample_block_}
, context(context_) , context(context_)
{ {
if (created_from_ddl && !pathStartsWith(filepath, context->getUserFilesPath())) auto user_files_path = context->getUserFilesPath();
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", filepath, context->getUserFilesPath()); if (created_from_ddl && !pathStartsWith(filepath, user_files_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", filepath, user_files_path);
} }

View File

@ -41,14 +41,15 @@ LibraryDictionarySource::LibraryDictionarySource(
, sample_block{sample_block_} , sample_block{sample_block_}
, context(Context::createCopy(context_)) , context(Context::createCopy(context_))
{ {
auto dictionaries_lib_path = context->getDictionariesLibPath();
bool path_checked = false; bool path_checked = false;
if (fs::is_symlink(path)) if (fs::is_symlink(path))
path_checked = symlinkStartsWith(path, context->getDictionariesLibPath()); path_checked = symlinkStartsWith(path, dictionaries_lib_path);
else else
path_checked = pathStartsWith(path, context->getDictionariesLibPath()); path_checked = pathStartsWith(path, dictionaries_lib_path);
if (created_from_ddl && !path_checked) if (created_from_ddl && !path_checked)
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, context->getDictionariesLibPath()); throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, dictionaries_lib_path);
if (!fs::exists(path)) if (!fs::exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path); throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);

View File

@ -213,8 +213,9 @@ DictionaryPtr createCacheDictionaryLayout(
else else
{ {
auto storage_configuration = parseSSDCacheStorageConfiguration(config, full_name, layout_type, dictionary_layout_prefix, dict_lifetime); auto storage_configuration = parseSSDCacheStorageConfiguration(config, full_name, layout_type, dictionary_layout_prefix, dict_lifetime);
if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, global_context->getUserFilesPath())) auto user_files_path = global_context->getUserFilesPath();
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, global_context->getUserFilesPath()); if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, user_files_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, user_files_path);
storage = std::make_shared<SSDCacheDictionaryStorage<dictionary_key_type>>(storage_configuration); storage = std::make_shared<SSDCacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
} }

View File

@ -550,7 +550,7 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
} }
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const
{ {
TableProperties properties; TableProperties properties;
TableLockHolder as_storage_lock; TableLockHolder as_storage_lock;
@ -589,10 +589,13 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
auto as_storage_metadata = as_storage->getInMemoryMetadataPtr(); auto as_storage_metadata = as_storage->getInMemoryMetadataPtr();
properties.columns = as_storage_metadata->getColumns(); properties.columns = as_storage_metadata->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines. /// Secondary indices and projections make sense only for MergeTree family of storage engines.
/// We should not copy them for other storages. /// We should not copy them for other storages.
if (create.storage && endsWith(create.storage->engine->name, "MergeTree")) if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
{
properties.indices = as_storage_metadata->getSecondaryIndices(); properties.indices = as_storage_metadata->getSecondaryIndices();
properties.projections = as_storage_metadata->getProjections().clone();
}
properties.constraints = as_storage_metadata->getConstraints(); properties.constraints = as_storage_metadata->getConstraints();
} }
@ -910,7 +913,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
} }
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way. /// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = setProperties(create); TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create);
DatabasePtr database; DatabasePtr database;
bool need_add_to_database = !create.temporary; bool need_add_to_database = !create.temporary;

View File

@ -74,7 +74,7 @@ private:
BlockIO createTable(ASTCreateQuery & create); BlockIO createTable(ASTCreateQuery & create);
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way. /// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties setProperties(ASTCreateQuery & create) const; TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const; void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void setEngine(ASTCreateQuery & create) const; void setEngine(ASTCreateQuery & create) const;
AccessRightsElements getRequiredAccess() const; AccessRightsElements getRequiredAccess() const;

View File

@ -158,7 +158,7 @@ void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
auto event_time_seconds = event_time / 1000000; auto event_time_seconds = event_time / 1000000;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType()); columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
columns[i++]->insert(event_time); columns[i++]->insert(event_time);
columns[i++]->insert(IPv6ToBinary(address.host()).data()); columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16);
columns[i++]->insert(address.port()); columns[i++]->insert(address.port());
columns[i++]->insert(session_id); columns[i++]->insert(session_id);

View File

@ -287,7 +287,7 @@ static void injectVirtualColumnsImpl(
{ {
ColumnPtr column; ColumnPtr column;
if (rows) if (rows)
column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst(); column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst();
else else
column = DataTypeUUID().createColumn(); column = DataTypeUUID().createColumn();
@ -306,7 +306,7 @@ static void injectVirtualColumnsImpl(
else if (virtual_column_name == "_partition_value") else if (virtual_column_name == "_partition_value")
{ {
if (rows) if (rows)
inserter.insertPartitionValueColumn(rows, task->data_part->partition.value, partition_value_type, virtual_column_name); inserter.insertPartitionValueColumn(rows, part->partition.value, partition_value_type, virtual_column_name);
else else
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name); inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
} }

View File

@ -757,16 +757,20 @@ DataTypePtr MergeTreeData::getPartitionValueType() const
} }
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const Block MergeTreeData::getSampleBlockWithVirtualColumns() const
{ {
DataTypePtr partition_value_type = getPartitionValueType(); DataTypePtr partition_value_type = getPartitionValueType();
bool has_partition_value = typeid_cast<const DataTypeTuple *>(partition_value_type.get()); return {
Block block{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"), ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"), ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid"), ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid"),
ColumnWithTypeAndName(partition_value_type->createColumn(), partition_value_type, "_partition_value")}; ColumnWithTypeAndName(partition_value_type->createColumn(), partition_value_type, "_partition_value")};
}
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const
{
auto block = getSampleBlockWithVirtualColumns();
MutableColumns columns = block.mutateColumns(); MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0]; auto & part_column = columns[0];
@ -774,6 +778,7 @@ Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPar
auto & part_uuid_column = columns[2]; auto & part_uuid_column = columns[2];
auto & partition_value_column = columns[3]; auto & partition_value_column = columns[3];
bool has_partition_value = typeid_cast<const ColumnTuple *>(partition_value_column.get());
for (const auto & part_or_projection : parts) for (const auto & part_or_projection : parts)
{ {
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get(); const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
@ -3465,7 +3470,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(
{ {
for (const auto & part : range) for (const auto & part : range)
{ {
for (const auto & [p_name, projection_part] : part->getProjectionParts()) for (const auto & [_, projection_part] : part->getProjectionParts())
res.push_back(projection_part); res.push_back(projection_part);
} }
} }
@ -4151,6 +4156,10 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
if (auto * select = query_ptr->as<ASTSelectQuery>(); select && select->final()) if (auto * select = query_ptr->as<ASTSelectQuery>(); select && select->final())
return false; return false;
// Currently projections don't support sampling yet.
if (settings.parallel_replicas_count > 1)
return false;
InterpreterSelectQuery select( InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias()); query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult(); const auto & analysis_result = select.getAnalysisResult();
@ -4194,13 +4203,13 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
candidate.remove_where_filter = analysis_result.remove_where_filter; candidate.remove_where_filter = analysis_result.remove_where_filter;
candidate.before_where = analysis_result.before_where->clone(); candidate.before_where = analysis_result.before_where->clone();
required_columns = candidate.before_where->foldActionsByProjection( auto new_required_columns = candidate.before_where->foldActionsByProjection(
required_columns, required_columns,
projection.sample_block_for_keys, projection.sample_block_for_keys,
candidate.where_column_name); candidate.where_column_name);
if (new_required_columns.empty() && !required_columns.empty())
if (required_columns.empty())
return false; return false;
required_columns = std::move(new_required_columns);
candidate.before_where->addAggregatesViaProjection(aggregates); candidate.before_where->addAggregatesViaProjection(aggregates);
} }
@ -4214,33 +4223,35 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
for (const auto & column : prewhere_actions->getResultColumns()) for (const auto & column : prewhere_actions->getResultColumns())
required_columns.erase(column.name); required_columns.erase(column.name);
{
// Prewhere_action should not add missing keys. // Prewhere_action should not add missing keys.
prewhere_required_columns = prewhere_actions->foldActionsByProjection( auto new_prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false); prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
if (prewhere_required_columns.empty())
return false; return false;
prewhere_required_columns = std::move(new_prewhere_required_columns);
candidate.prewhere_info->prewhere_actions = prewhere_actions; candidate.prewhere_info->prewhere_actions = prewhere_actions;
}
if (candidate.prewhere_info->row_level_filter) if (candidate.prewhere_info->row_level_filter)
{ {
auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone(); auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( auto new_prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false); prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
if (prewhere_required_columns.empty())
return false; return false;
prewhere_required_columns = std::move(new_prewhere_required_columns);
candidate.prewhere_info->row_level_filter = row_level_filter_actions; candidate.prewhere_info->row_level_filter = row_level_filter_actions;
} }
if (candidate.prewhere_info->alias_actions) if (candidate.prewhere_info->alias_actions)
{ {
auto alias_actions = candidate.prewhere_info->alias_actions->clone(); auto alias_actions = candidate.prewhere_info->alias_actions->clone();
prewhere_required_columns auto new_prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false); = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
if (prewhere_required_columns.empty())
return false; return false;
prewhere_required_columns = std::move(new_prewhere_required_columns);
candidate.prewhere_info->alias_actions = alias_actions; candidate.prewhere_info->alias_actions = alias_actions;
} }
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
@ -4259,11 +4270,20 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
return match; return match;
}; };
for (const auto & projection : metadata_snapshot->projections) auto virtual_block = getSampleBlockWithVirtualColumns();
auto add_projection_candidate = [&](const ProjectionDescription & projection)
{ {
ProjectionCandidate candidate{}; ProjectionCandidate candidate{};
candidate.desc = &projection; candidate.desc = &projection;
auto sample_block = projection.sample_block;
auto sample_block_for_keys = projection.sample_block_for_keys;
for (const auto & column : virtual_block)
{
sample_block.insertUnique(column);
sample_block_for_keys.insertUnique(column);
}
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection) if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
{ {
bool match = true; bool match = true;
@ -4271,7 +4291,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
// Let's first check if all aggregates are provided by current projection // Let's first check if all aggregates are provided by current projection
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates()) for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
{ {
const auto * column = projection.sample_block.findByName(aggregate.column_name); const auto * column = sample_block.findByName(aggregate.column_name);
if (column) if (column)
{ {
aggregates.insert(*column); aggregates.insert(*column);
@ -4284,25 +4304,25 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
} }
if (!match) if (!match)
continue; return;
// Check if all aggregation keys can be either provided by some action, or by current // Check if all aggregation keys can be either provided by some action, or by current
// projection directly. Reshape the `before_aggregation` action DAG so that it only // projection directly. Reshape the `before_aggregation` action DAG so that it only
// needs to provide aggregation keys, and certain children DAG might be substituted by // needs to provide aggregation keys, and the DAG of certain child might be substituted
// some keys in projection. // by some keys in projection.
candidate.before_aggregation = analysis_result.before_aggregation->clone(); candidate.before_aggregation = analysis_result.before_aggregation->clone();
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys); auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, sample_block_for_keys);
// TODO Let's find out the exact required_columns for keys. // TODO Let's find out the exact required_columns for keys.
if (required_columns.empty() && (!keys.empty() && !candidate.before_aggregation->getRequiredColumns().empty())) if (required_columns.empty() && (!keys.empty() && !candidate.before_aggregation->getRequiredColumns().empty()))
continue; return;
if (analysis_result.optimize_aggregation_in_order) if (analysis_result.optimize_aggregation_in_order)
{ {
for (const auto & key : keys) for (const auto & key : keys)
{ {
auto actions_dag = analysis_result.before_aggregation->clone(); auto actions_dag = analysis_result.before_aggregation->clone();
actions_dag->foldActionsByProjection({key}, projection.sample_block_for_keys); actions_dag->foldActionsByProjection({key}, sample_block_for_keys);
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings)); candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));
} }
} }
@ -4311,7 +4331,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map); candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
candidate.before_aggregation->addAggregatesViaProjection(aggregates); candidate.before_aggregation->addAggregatesViaProjection(aggregates);
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates)) if (rewrite_before_where(candidate, projection, required_columns, sample_block_for_keys, aggregates))
{ {
candidate.required_columns = {required_columns.begin(), required_columns.end()}; candidate.required_columns = {required_columns.begin(), required_columns.end()};
for (const auto & aggregate : aggregates) for (const auto & aggregate : aggregates)
@ -4328,13 +4348,16 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
for (const auto & column : actions->getRequiredColumns()) for (const auto & column : actions->getRequiredColumns())
required_columns.insert(column.name); required_columns.insert(column.name);
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {})) if (rewrite_before_where(candidate, projection, required_columns, sample_block, {}))
{ {
candidate.required_columns = {required_columns.begin(), required_columns.end()}; candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate)); candidates.push_back(std::move(candidate));
} }
} }
} };
for (const auto & projection : metadata_snapshot->projections)
add_projection_candidate(projection);
// Let's select the best projection to execute the query. // Let's select the best projection to execute the query.
if (!candidates.empty()) if (!candidates.empty())
@ -4409,6 +4432,14 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
if (!selected_candidate) if (!selected_candidate)
return false; return false;
else if (min_sum_marks == 0)
{
/// If selected_projection indicated an empty result set. Remember it in query_info but
/// don't use projection to run the query, because projection pipeline with empty result
/// set will not work correctly with empty_result_for_aggregation_by_empty_set.
query_info.merge_tree_empty_result = true;
return false;
}
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate) if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
{ {

View File

@ -795,6 +795,9 @@ public:
/// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty. /// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty.
DataTypePtr getPartitionValueType() const; DataTypePtr getPartitionValueType() const;
/// Construct a sample block of virtual columns.
Block getSampleBlockWithVirtualColumns() const;
/// Construct a block consisting only of possible virtual columns for part pruning. /// Construct a block consisting only of possible virtual columns for part pruning.
/// If one_part is true, fill in at most one part. /// If one_part is true, fill in at most one part.
Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const; Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const;

View File

@ -89,6 +89,7 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
future_part_type = std::min(future_part_type, part->getType()); future_part_type = std::min(future_part_type, part->getType());
} }
/// NOTE: We don't support merging into an in-memory part yet.
auto chosen_type = parts_.front()->storage.choosePartTypeOnDisk(sum_bytes_uncompressed, sum_rows); auto chosen_type = parts_.front()->storage.choosePartTypeOnDisk(sum_bytes_uncompressed, sum_rows);
future_part_type = std::min(future_part_type, chosen_type); future_part_type = std::min(future_part_type, chosen_type);
assign(std::move(parts_), future_part_type); assign(std::move(parts_), future_part_type);
@ -2014,10 +2015,19 @@ void MergeTreeDataMergerMutator::writeWithProjections(
std::map<String, MergeTreeData::MutableDataPartsVector> projection_parts; std::map<String, MergeTreeData::MutableDataPartsVector> projection_parts;
Block block; Block block;
std::vector<SquashingTransform> projection_squashes; std::vector<SquashingTransform> projection_squashes;
const auto & settings = context->getSettingsRef();
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i) for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{ {
projection_squashes.emplace_back(65536, 65536 * 256); // If the parent part is an in-memory part, squash projection output into one block and
// build in-memory projection because we don't support merging into a new in-memory part.
// Otherwise we split the materialization into multiple stages similar to the process of
// INSERT SELECT query.
if (new_data_part->getType() == MergeTreeDataPartType::IN_MEMORY)
projection_squashes.emplace_back(0, 0);
else
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
} }
while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read())) while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read()))
{ {
if (minmax_idx) if (minmax_idx)
@ -2028,26 +2038,10 @@ void MergeTreeDataMergerMutator::writeWithProjections(
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i) for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{ {
const auto & projection = projections_to_build[i]->projection; const auto & projection = projections_to_build[i]->projection;
auto in = InterpreterSelectQuery( auto projection_block = projection_squashes[i].add(projection.calculate(block, context));
projection.query_ast,
context,
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
SelectQueryOptions{
projection.type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState})
.execute()
.getInputStream();
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), std::numeric_limits<UInt64>::max());
in->readPrefix();
auto & projection_squash = projection_squashes[i];
auto projection_block = projection_squash.add(in->read());
if (in->read())
throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR);
in->readSuffix();
if (projection_block) if (projection_block)
{ projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
projection_parts[projection.name].emplace_back( data, log, projection_block, projection, new_data_part.get(), ++block_num));
MergeTreeDataWriter::writeTempProjectionPart(data, log, projection_block, projection, new_data_part.get(), ++block_num));
}
} }
merge_entry->rows_written += block.rows(); merge_entry->rows_written += block.rows();

View File

@ -94,6 +94,42 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec); MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec);
out.writePrefix(); out.writePrefix();
out.write(block); out.write(block);
const auto & projections = metadata_snapshot->getProjections();
for (const auto & [projection_name, projection] : projection_parts)
{
if (projections.has(projection_name))
{
String projection_destination_path = fs::path(destination_path) / projection_name / ".proj";
if (disk->exists(projection_destination_path))
{
throw Exception(
ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Could not flush projection part {}. Projection part in {} already exists",
projection_name,
fullPath(disk, projection_destination_path));
}
auto projection_part = asInMemoryPart(projection);
auto projection_type = storage.choosePartTypeOnDisk(projection_part->block.bytes(), rows_count);
MergeTreePartInfo projection_info("all", 0, 0, 0);
auto projection_data_part
= storage.createPart(projection_name, projection_type, projection_info, volume, projection_name + ".proj", parent_part);
projection_data_part->is_temp = false; // clean up will be done on parent part
projection_data_part->setColumns(projection->getColumns());
disk->createDirectories(projection_destination_path);
const auto & desc = projections.get(name);
auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices());
MergedBlockOutputStream projection_out(
projection_data_part, desc.metadata, projection_part->columns, projection_indices, projection_compression_codec);
projection_out.writePrefix();
projection_out.write(projection_part->block);
projection_out.writeSuffixAndFinalizePart(projection_data_part);
new_data_part->addProjectionPart(projection_name, std::move(projection_data_part));
}
}
out.writeSuffixAndFinalizePart(new_data_part); out.writeSuffixAndFinalizePart(new_data_part);
} }

View File

@ -132,6 +132,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
{ {
if (query_info.merge_tree_empty_result)
return std::make_unique<QueryPlan>();
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
if (!query_info.projection) if (!query_info.projection)
{ {
@ -181,7 +184,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
max_block_numbers_to_read, max_block_numbers_to_read,
query_info.projection->merge_tree_projection_select_result_ptr); query_info.projection->merge_tree_projection_select_result_ptr);
if (plan) if (plan->isInitialized())
{ {
// If `before_where` is not empty, transform input blocks by adding needed columns // If `before_where` is not empty, transform input blocks by adding needed columns
// originated from key columns. We already project the block at the end, using // originated from key columns. We already project the block at the end, using
@ -237,7 +240,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
ordinary_query_plan.addStep(std::move(where_step)); ordinary_query_plan.addStep(std::move(where_step));
} }
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline); ordinary_pipe = ordinary_query_plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
} }
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
@ -351,12 +355,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
pipes.emplace_back(std::move(projection_pipe)); pipes.emplace_back(std::move(projection_pipe));
pipes.emplace_back(std::move(ordinary_pipe)); pipes.emplace_back(std::move(ordinary_pipe));
auto pipe = Pipe::unitePipes(std::move(pipes)); auto pipe = Pipe::unitePipes(std::move(pipes));
pipe.resize(1); auto plan = std::make_unique<QueryPlan>();
if (pipe.empty())
return plan;
pipe.resize(1);
auto step = std::make_unique<ReadFromStorageStep>( auto step = std::make_unique<ReadFromStorageStep>(
std::move(pipe), std::move(pipe),
fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name)); fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name));
auto plan = std::make_unique<QueryPlan>();
plan->addStep(std::move(step)); plan->addStep(std::move(step));
return plan; return plan;
} }

View File

@ -386,31 +386,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
sync_guard = disk->getDirectorySyncGuard(full_path); sync_guard = disk->getDirectorySyncGuard(full_path);
} }
if (metadata_snapshot->hasProjections())
{
for (const auto & projection : metadata_snapshot->getProjections())
{
auto in = InterpreterSelectQuery(
projection.query_ast,
context,
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
SelectQueryOptions{
projection.type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState})
.execute()
.getInputStream();
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), std::numeric_limits<UInt64>::max());
in->readPrefix();
auto projection_block = in->read();
if (in->read())
throw Exception("Projection cannot grow block rows", ErrorCodes::LOGICAL_ERROR);
in->readSuffix();
if (projection_block.rows())
{
new_data_part->addProjectionPart(projection.name, writeProjectionPart(projection_block, projection, new_data_part.get()));
}
}
}
if (metadata_snapshot->hasRowsTTL()) if (metadata_snapshot->hasRowsTTL())
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true); updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
@ -439,6 +414,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
out.writePrefix(); out.writePrefix();
out.writeWithPermutation(block, perm_ptr); out.writeWithPermutation(block, perm_ptr);
for (const auto & projection : metadata_snapshot->getProjections())
{
auto projection_block = projection.calculate(block, context);
if (projection_block.rows())
new_data_part->addProjectionPart(
projection.name, writeProjectionPart(data, log, projection_block, projection, new_data_part.get()));
}
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert); out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
@ -449,18 +432,28 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
} }
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
MergeTreeData & data, const String part_name,
MergeTreeDataPartType part_type,
const String & relative_path,
bool is_temp,
const IMergeTreeDataPart * parent_part,
const MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot)
MergeTreeData::MutableDataPartPtr && new_data_part)
{ {
MergeTreePartInfo new_part_info("all", 0, 0, 0);
auto new_data_part = data.createPart(
part_name,
part_type,
new_part_info,
parent_part->volume,
relative_path,
parent_part);
new_data_part->is_temp = is_temp;
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
MergeTreePartition partition{};
IMergeTreeDataPart::MinMaxIndex minmax_idx{};
new_data_part->setColumns(columns); new_data_part->setColumns(columns);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
if (new_data_part->isStoredOnDisk()) if (new_data_part->isStoredOnDisk())
{ {
@ -523,27 +516,41 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk());
return std::move(new_data_part); return new_data_part;
} }
MergeTreeData::MutableDataPartPtr MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart(
MergeTreeDataWriter::writeProjectionPart(Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part) MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part)
{ {
String part_name = projection.name;
MergeTreeDataPartType part_type;
if (parent_part->getType() == MergeTreeDataPartType::IN_MEMORY)
{
part_type = MergeTreeDataPartType::IN_MEMORY;
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon /// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes(); size_t expected_size = block.bytes();
// just check if there is enough space on parent volume // just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->volume); data.reserveSpace(expected_size, parent_part->volume);
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
String part_name = projection.name; return writeProjectionPartImpl(
MergeTreePartInfo new_part_info("all", 0, 0, 0); part_name,
auto new_data_part = data.createPart( part_type,
part_name, data.choosePartType(expected_size, block.rows()), new_part_info, parent_part->volume, part_name + ".proj", parent_part); part_name + ".proj" /* relative_path */,
new_data_part->is_temp = false; // clean up will be done on parent part false /* is_temp */,
parent_part,
return writeProjectionPartImpl(data, log, block, projection.metadata, std::move(new_data_part)); data,
log,
block,
projection.metadata);
} }
/// This is used for projection materialization process which may contain multiple stages of
/// projection part merges.
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
MergeTreeData & data, MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
@ -552,24 +559,50 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
const IMergeTreeDataPart * parent_part, const IMergeTreeDataPart * parent_part,
size_t block_num) size_t block_num)
{ {
String part_name = fmt::format("{}_{}", projection.name, block_num);
MergeTreeDataPartType part_type;
if (parent_part->getType() == MergeTreeDataPartType::IN_MEMORY)
{
part_type = MergeTreeDataPartType::IN_MEMORY;
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon /// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes(); size_t expected_size = block.bytes();
// just check if there is enough space on parent volume // just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->volume); data.reserveSpace(expected_size, parent_part->volume);
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
String part_name = fmt::format("{}_{}", projection.name, block_num); return writeProjectionPartImpl(
MergeTreePartInfo new_part_info("all", 0, 0, 0);
auto new_data_part = data.createPart(
part_name, part_name,
data.choosePartType(expected_size, block.rows()), part_type,
new_part_info, "tmp_insert_" + part_name + ".proj" /* relative_path */,
parent_part->volume, true /* is_temp */,
"tmp_insert_" + part_name + ".proj", parent_part,
parent_part); data,
new_data_part->is_temp = true; // It's part for merge log,
block,
projection.metadata);
}
return writeProjectionPartImpl(data, log, block, projection.metadata, std::move(new_data_part)); MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeInMemoryProjectionPart(
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
const IMergeTreeDataPart * parent_part)
{
return writeProjectionPartImpl(
projection.name,
MergeTreeDataPartType::IN_MEMORY,
projection.name + ".proj" /* relative_path */,
false /* is_temp */,
parent_part,
data,
log,
block,
projection.metadata);
} }
} }

View File

@ -49,9 +49,15 @@ public:
MergeTreeData::MutableDataPartPtr MergeTreeData::MutableDataPartPtr
writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
MergeTreeData::MutableDataPartPtr writeProjectionPart( /// For insertion.
Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part); static MergeTreeData::MutableDataPartPtr writeProjectionPart(
MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
const IMergeTreeDataPart * parent_part);
/// For mutation: MATERIALIZE PROJECTION.
static MergeTreeData::MutableDataPartPtr writeTempProjectionPart( static MergeTreeData::MutableDataPartPtr writeTempProjectionPart(
MergeTreeData & data, MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
@ -60,15 +66,27 @@ public:
const IMergeTreeDataPart * parent_part, const IMergeTreeDataPart * parent_part,
size_t block_num); size_t block_num);
/// For WriteAheadLog AddPart.
static MergeTreeData::MutableDataPartPtr writeInMemoryProjectionPart(
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
const IMergeTreeDataPart * parent_part);
Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation); Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation);
private: private:
static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl( static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl(
MergeTreeData & data, const String part_name,
MergeTreeDataPartType part_type,
const String & relative_path,
bool is_temp,
const IMergeTreeDataPart * parent_part,
const MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot);
MergeTreeData::MutableDataPartPtr && new_data_part);
MergeTreeData & data; MergeTreeData & data;

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h> #include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h> #include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h> #include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <IO/MemoryReadWriteBuffer.h> #include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -31,6 +32,7 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
, name(name_) , name(name_)
, path(storage.getRelativeDataPath() + name_) , path(storage.getRelativeDataPath() + name_)
, pool(storage.getContext()->getSchedulePool()) , pool(storage.getContext()->getSchedulePool())
, log(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)"))
{ {
init(); init();
sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this] sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this]
@ -172,8 +174,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME || e.code() == ErrorCodes::BAD_DATA_PART_NAME
|| e.code() == ErrorCodes::CORRUPTED_DATA) || e.code() == ErrorCodes::CORRUPTED_DATA)
{ {
LOG_WARNING(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)"), LOG_WARNING(log, "WAL file '{}' is broken. {}", path, e.displayText());
"WAL file '{}' is broken. {}", path, e.displayText());
/// If file is broken, do not write new parts to it. /// If file is broken, do not write new parts to it.
/// But if it contains any part rotate and save them. /// But if it contains any part rotate and save them.
@ -203,6 +204,15 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
part_out.writePrefix(); part_out.writePrefix();
part_out.write(block); part_out.write(block);
for (const auto & projection : metadata_snapshot->getProjections())
{
auto projection_block = projection.calculate(block, context);
if (projection_block.rows())
part->addProjectionPart(
projection.name,
MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get()));
}
part_out.writeSuffixAndFinalizePart(part); part_out.writeSuffixAndFinalizePart(part);
min_block_number = std::min(min_block_number, part->info.min_block); min_block_number = std::min(min_block_number, part->info.min_block);

View File

@ -91,6 +91,8 @@ private:
bool sync_scheduled = false; bool sync_scheduled = false;
mutable std::mutex write_mutex; mutable std::mutex write_mutex;
Poco::Logger * log;
}; };
} }

View File

@ -34,6 +34,7 @@ public:
void writeSuffix() override; void writeSuffix() override;
/// Finilize writing part and fill inner structures /// Finilize writing part and fill inner structures
/// If part is new and contains projections, they should be added before invoking this method.
void writeSuffixAndFinalizePart( void writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
bool sync = false, bool sync = false,

View File

@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
/// This wait in background schedule pool is useless. It'd be /// This wait in background schedule pool is useless. It'd be
/// better to have some notification which will call `step` /// better to have some notification which will call `step`
/// function when all replicated will finish. TODO. /// function when all replicated will finish. TODO.
storage.waitForAllReplicasToProcessLogEntry(log_entry, true); storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1);
} }
{ {
@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created; String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1);
} }
{ {
@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created; String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1);
} }
{ {
@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created; String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1);
} }
{ {
@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
{ {
ReplicatedMergeTreeLogEntry log_entry; ReplicatedMergeTreeLogEntry log_entry;
if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false))
storage.waitForAllReplicasToProcessLogEntry(log_entry, true); storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1);
} }
{ {

View File

@ -14,6 +14,7 @@
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/SquashingBlockInputStream.h>
namespace DB namespace DB
{ {
@ -23,6 +24,7 @@ namespace ErrorCodes
extern const int NO_SUCH_PROJECTION_IN_TABLE; extern const int NO_SUCH_PROJECTION_IN_TABLE;
extern const int ILLEGAL_PROJECTION; extern const int ILLEGAL_PROJECTION;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}; };
const char * ProjectionDescription::typeToString(Type type) const char * ProjectionDescription::typeToString(Type type)
@ -192,6 +194,28 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription &
*this = getProjectionFromAST(definition_ast, new_columns, query_context); *this = getProjectionFromAST(definition_ast, new_columns, query_context);
} }
Block ProjectionDescription::calculate(const Block & block, ContextPtr context) const
{
auto in = InterpreterSelectQuery(
query_ast,
context,
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
SelectQueryOptions{
type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns
: QueryProcessingStage::WithMergeableState})
.execute()
.getInputStream();
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), 0);
in->readPrefix();
auto ret = in->read();
if (in->read())
throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR);
in->readSuffix();
return ret;
}
String ProjectionsDescription::toString() const String ProjectionsDescription::toString() const
{ {
if (empty()) if (empty())

View File

@ -85,6 +85,8 @@ struct ProjectionDescription
void recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context); void recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context);
bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const; bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
Block calculate(const Block & block, ContextPtr context) const;
}; };
/// All projections in storage /// All projections in storage

View File

@ -163,6 +163,7 @@ struct SelectQueryInfo
std::optional<ProjectionCandidate> projection; std::optional<ProjectionCandidate> projection;
bool ignore_projections = false; bool ignore_projections = false;
bool is_projection_query = false; bool is_projection_query = false;
bool merge_tree_empty_result = false;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr; MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
}; };

View File

@ -338,9 +338,10 @@ Pipe StorageMerge::read(
auto pipe = Pipe::unitePipes(std::move(pipes)); auto pipe = Pipe::unitePipes(std::move(pipes));
if (!pipe.empty()) if (!pipe.empty() && !query_info.input_order_info)
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time. // It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead. // Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
// because narrowPipe doesn't preserve order.
narrowPipe(pipe, num_streams); narrowPipe(pipe, num_streams);
return pipe; return pipe;

View File

@ -4732,12 +4732,10 @@ bool StorageReplicatedMergeTree::optimize(
} }
} }
if (query_context->getSettingsRef().replication_alter_partitions_sync != 0) table_lock.reset();
{
/// NOTE Table lock must not be held while waiting. Some combination of R-W-R locks from different threads will yield to deadlock.
for (auto & merge_entry : merge_entries) for (auto & merge_entry : merge_entries)
waitForAllReplicasToProcessLogEntry(merge_entry, false); waitForLogEntryToBeProcessedIfNecessary(merge_entry, query_context);
}
return true; return true;
} }
@ -5048,20 +5046,8 @@ void StorageReplicatedMergeTree::alter(
table_lock_holder.reset(); table_lock_holder.reset();
std::vector<String> unwaited;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
{
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false); waitForLogEntryToBeProcessedIfNecessary(*alter_entry, query_context, "Some replicas doesn't finish metadata alter: ");
}
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
{
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
waitForReplicaToProcessLogEntry(replica_name, *alter_entry);
}
if (!unwaited.empty())
throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED);
if (mutation_znode) if (mutation_znode)
{ {
@ -5212,11 +5198,7 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach,
dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true); dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true);
/// If necessary, wait until the operation is performed on itself or on all replicas. waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry);
} }
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context)
@ -5233,12 +5215,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
if (did_drop) if (did_drop)
{ {
/// If necessary, wait until the operation is performed on itself or on all replicas. waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry);
cleanLastPartNode(partition_id); cleanLastPartNode(partition_id);
} }
} }
@ -5257,13 +5234,17 @@ void StorageReplicatedMergeTree::truncate(
Strings partitions = zookeeper->getChildren(fs::path(zookeeper_path) / "block_numbers"); Strings partitions = zookeeper->getChildren(fs::path(zookeeper_path) / "block_numbers");
std::vector<std::unique_ptr<LogEntry>> entries_to_wait;
entries_to_wait.reserve(partitions.size());
for (String & partition_id : partitions) for (String & partition_id : partitions)
{ {
LogEntry entry; auto entry = std::make_unique<LogEntry>();
if (dropAllPartsInPartition(*zookeeper, partition_id, *entry, query_context, false))
if (dropAllPartsInPartition(*zookeeper, partition_id, entry, query_context, false)) entries_to_wait.push_back(std::move(entry));
waitForAllReplicasToProcessLogEntry(entry);
} }
for (const auto & entry : entries_to_wait)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
} }
@ -5421,19 +5402,20 @@ StorageReplicatedMergeTree::allocateBlockNumber(
} }
Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry(
const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{ {
LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name); LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name);
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas"); Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas");
Strings unwaited; Strings unwaited;
bool wait_for_inactive = wait_for_inactive_timeout != 0;
for (const String & replica : replicas) for (const String & replica : replicas)
{ {
if (wait_for_non_active || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) if (wait_for_inactive || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"))
{ {
if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_non_active)) if (!tryWaitForReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout))
unwaited.push_back(replica); unwaited.push_back(replica);
} }
else else
@ -5446,16 +5428,38 @@ Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry(
return unwaited; return unwaited;
} }
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(
Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout, const String & error_context)
const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{ {
return waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_non_active); Strings unfinished_replicas = tryWaitForAllReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout);
if (unfinished_replicas.empty())
return;
throw Exception(ErrorCodes::UNFINISHED, "{}Timeout exceeded while waiting for replicas {} to process entry {}. "
"Probably some replicas are inactive", error_context, fmt::join(unfinished_replicas, ", "), entry.znode_name);
} }
void StorageReplicatedMergeTree::waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context)
{
/// If necessary, wait until the operation is performed on itself or on all replicas.
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
{
bool finished = tryWaitForReplicaToProcessLogEntry(zookeeper_path, replica_name, entry, wait_for_inactive_timeout);
if (!finished)
{
throw Exception(ErrorCodes::UNFINISHED, "{}Log entry {} is not precessed on local replica, "
"most likely because the replica was shut down.", error_context, entry.znode_name);
}
}
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
{
waitForAllReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout, error_context);
}
}
bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{ {
String entry_str = entry.toString(); String entry_str = entry.toString();
String log_node_name; String log_node_name;
@ -5473,18 +5477,27 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
*/ */
bool waiting_itself = replica == replica_name; bool waiting_itself = replica == replica_name;
/// Do not wait if timeout is zero
bool wait_for_inactive = wait_for_inactive_timeout != 0;
/// Wait for unlimited time if timeout is negative
bool check_timeout = wait_for_inactive_timeout > 0;
Stopwatch time_waiting;
const auto & stop_waiting = [&]() const auto & stop_waiting = [&]()
{ {
bool stop_waiting_itself = waiting_itself && partial_shutdown_called; bool stop_waiting_itself = waiting_itself && partial_shutdown_called;
bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); bool timeout_exceeded = check_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds();
return is_dropped || stop_waiting_itself || stop_waiting_non_active; bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded)
&& !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active");
return is_dropped || stop_waiting_itself || stop_waiting_inactive;
}; };
/// Don't recheck ZooKeeper too often /// Don't recheck ZooKeeper too often
constexpr auto event_wait_timeout_ms = 3000; constexpr auto event_wait_timeout_ms = 3000;
if (startsWith(entry.znode_name, "log-")) if (!startsWith(entry.znode_name, "log-"))
throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);
{ {
/// Take the number from the node name `log-xxxxxxxxxx`. /// Take the number from the node name `log-xxxxxxxxxx`.
UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10)); UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
@ -5493,13 +5506,17 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name); LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name);
/// Let's wait until entry gets into the replica queue. /// Let's wait until entry gets into the replica queue.
bool pulled_to_queue = false;
while (!stop_waiting()) while (!stop_waiting())
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event); String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index) if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
{
pulled_to_queue = true;
break; break;
}
/// Wait with timeout because we can be already shut down, but not dropped. /// Wait with timeout because we can be already shut down, but not dropped.
/// So log_pointer node will exist, but we will never update it because all background threads already stopped. /// So log_pointer node will exist, but we will never update it because all background threads already stopped.
@ -5507,9 +5524,10 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
/// but the query will never finish because the drop already shut down the table. /// but the query will never finish because the drop already shut down the table.
event->tryWait(event_wait_timeout_ms); event->tryWait(event_wait_timeout_ms);
} }
if (!pulled_to_queue)
return false;
} }
else
throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(log, "Looking for node corresponding to {} in {} queue", log_node_name, replica); LOG_DEBUG(log, "Looking for node corresponding to {} in {} queue", log_node_name, replica);
@ -5547,13 +5565,6 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
} }
bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{
return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_non_active);
}
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{ {
auto zookeeper = tryGetZooKeeper(); auto zookeeper = tryGetZooKeeper();
@ -6562,13 +6573,10 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread.wakeup(); cleanup_thread.wakeup();
/// If necessary, wait until the operation is performed on all replicas.
if (query_context->getSettingsRef().replication_alter_partitions_sync > 1)
{
lock2.reset(); lock2.reset();
lock1.reset(); lock1.reset();
waitForAllReplicasToProcessLogEntry(entry);
} waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
} }
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context)
@ -6767,12 +6775,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread.wakeup(); cleanup_thread.wakeup();
if (query_context->getSettingsRef().replication_alter_partitions_sync > 1)
{
lock2.reset(); lock2.reset();
dest_table_storage->waitForAllReplicasToProcessLogEntry(entry);
} dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
/// Create DROP_RANGE for the source table /// Create DROP_RANGE for the source table
Coordination::Requests ops_src; Coordination::Requests ops_src;
@ -6787,11 +6792,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created; log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.front()).path_created;
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
if (query_context->getSettingsRef().replication_alter_partitions_sync > 1)
{
lock1.reset(); lock1.reset();
waitForAllReplicasToProcessLogEntry(entry_delete); waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
}
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id); cleanLastPartNode(partition_id);
@ -7540,6 +7542,9 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
out.writePrefix(); out.writePrefix();
out.write(block); out.write(block);
/// TODO(ab): What projections should we add to the empty part? How can we make sure that it
/// won't block future merges? Perhaps we should also check part emptiness when selecting parts
/// to merge.
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert); out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
try try

View File

@ -635,22 +635,27 @@ private:
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const; const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
/** Wait until all replicas, including this, execute the specified action from the log. /** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica . * If replicas are added at the same time, it can not wait the added replica.
*
* Waits for inactive replicas no more than wait_for_inactive_timeout.
* Returns list of inactive replicas that have not executed entry or throws exception.
* *
* NOTE: This method must be called without table lock held. * NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
* TODO: There are wrong usages of this method that are not fixed yet.
*
* One method for convenient use on current table, another for waiting on foreign shards.
*/ */
Strings waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); Int64 wait_for_inactive_timeout, const String & error_context = {});
Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout);
/** Wait until the specified replica executes the specified action from the log. /** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above. * NOTE: See comment about locks above.
*/ */
bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
/// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});
/// Throw an exception if the table is readonly. /// Throw an exception if the table is readonly.
void assertNotReadonly() const; void assertNotReadonly() const;

View File

@ -75,7 +75,9 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, {"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}, {"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())} {"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"projections", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
} }
) )
{ {
@ -253,6 +255,13 @@ void StorageSystemParts::processNextStorage(
add_ttl_info_map(part->ttl_infos.group_by_ttl); add_ttl_info_map(part->ttl_infos.group_by_ttl);
add_ttl_info_map(part->ttl_infos.rows_where_ttl); add_ttl_info_map(part->ttl_infos.rows_where_ttl);
Array projections;
for (const auto & [name, _] : part->getProjectionParts())
projections.push_back(name);
if (columns_mask[src_index++])
columns[res_index++]->insert(projections);
/// _state column should be the latest. /// _state column should be the latest.
/// Do not use part->getState*, it can be changed from different thread /// Do not use part->getState*, it can be changed from different thread
if (has_state_column) if (has_state_column)

View File

@ -0,0 +1,6 @@
<yandex>
<merge_tree>
<!-- 10 seconds (default is 1 minute) -->
<zookeeper_session_expiration_check_period>10</zookeeper_session_expiration_check_period>
</merge_tree>
</yandex>

View File

@ -1,5 +1,6 @@
<yandex> <yandex>
<top_level_domains_lists> <top_level_domains_lists>
<public_suffix_list>public_suffix_list.dat</public_suffix_list> <public_suffix_list>public_suffix_list.dat</public_suffix_list>
<no_new_line_list>no_new_line_list.dat</no_new_line_list>
</top_level_domains_lists> </top_level_domains_lists>
</yandex> </yandex>

View File

@ -28,6 +28,7 @@ ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/

View File

@ -0,0 +1 @@
foo.bar

View File

@ -6,6 +6,8 @@
<http_receive_timeout>60</http_receive_timeout> <http_receive_timeout>60</http_receive_timeout>
<!-- 1 minute (default is 10 minutes) --> <!-- 1 minute (default is 10 minutes) -->
<insert_quorum_timeout>60000</insert_quorum_timeout> <insert_quorum_timeout>60000</insert_quorum_timeout>
<!-- 30 seconds (default is 2 minutes) -->
<replication_wait_for_inactive_replica_timeout>30</replication_wait_for_inactive_replica_timeout>
</default> </default>
</profiles> </profiles>
</yandex> </yandex>

View File

@ -218,6 +218,10 @@ def test_watchers(started_cluster):
print("Fake data", fake_data_watch_data) print("Fake data", fake_data_watch_data)
assert genuine_data_watch_data == fake_data_watch_data assert genuine_data_watch_data == fake_data_watch_data
genuine_zk.create("/test_data_watches/child", b"a")
fake_zk.create("/test_data_watches/child", b"a")
genuine_children = None genuine_children = None
def genuine_child_callback(event): def genuine_child_callback(event):
print("Genuine child watch called") print("Genuine child watch called")
@ -233,16 +237,74 @@ def test_watchers(started_cluster):
genuine_zk.get_children("/test_data_watches", watch=genuine_child_callback) genuine_zk.get_children("/test_data_watches", watch=genuine_child_callback)
fake_zk.get_children("/test_data_watches", watch=fake_child_callback) fake_zk.get_children("/test_data_watches", watch=fake_child_callback)
print("Calling non related genuine child")
genuine_zk.set("/test_data_watches/child", b"q")
genuine_zk.set("/test_data_watches", b"q")
print("Calling non related fake child")
fake_zk.set("/test_data_watches/child", b"q")
fake_zk.set("/test_data_watches", b"q")
time.sleep(3)
assert genuine_children == None
assert fake_children == None
print("Calling genuine child") print("Calling genuine child")
genuine_zk.create("/test_data_watches/child", b"b") genuine_zk.create("/test_data_watches/child_new", b"b")
print("Calling fake child") print("Calling fake child")
fake_zk.create("/test_data_watches/child", b"b") fake_zk.create("/test_data_watches/child_new", b"b")
time.sleep(3) time.sleep(3)
print("Genuine children", genuine_children) print("Genuine children", genuine_children)
print("Fake children", fake_children) print("Fake children", fake_children)
assert genuine_children == fake_children assert genuine_children == fake_children
genuine_children_delete = None
def genuine_child_delete_callback(event):
print("Genuine child watch called")
nonlocal genuine_children_delete
genuine_children_delete = event
fake_children_delete = None
def fake_child_delete_callback(event):
print("Fake child watch called")
nonlocal fake_children_delete
fake_children_delete = event
genuine_child_delete = None
def genuine_own_delete_callback(event):
print("Genuine child watch called")
nonlocal genuine_child_delete
genuine_child_delete = event
fake_child_delete = None
def fake_own_delete_callback(event):
print("Fake child watch called")
nonlocal fake_child_delete
fake_child_delete = event
genuine_zk.get_children("/test_data_watches", watch=genuine_child_delete_callback)
fake_zk.get_children("/test_data_watches", watch=fake_child_delete_callback)
genuine_zk.get_children("/test_data_watches/child", watch=genuine_own_delete_callback)
fake_zk.get_children("/test_data_watches/child", watch=fake_own_delete_callback)
print("Calling genuine child delete")
genuine_zk.delete("/test_data_watches/child")
print("Calling fake child delete")
fake_zk.delete("/test_data_watches/child")
time.sleep(3)
print("Genuine children delete", genuine_children_delete)
print("Fake children delete", fake_children_delete)
assert genuine_children_delete == fake_children_delete
print("Genuine child delete", genuine_child_delete)
print("Fake child delete", fake_child_delete)
assert genuine_child_delete == fake_child_delete
finally: finally:
for zk in [genuine_zk, fake_zk]: for zk in [genuine_zk, fake_zk]:
stop_zk(zk) stop_zk(zk)

View File

@ -12,6 +12,7 @@ SYSTEM SYNC REPLICA byte_identical_r2;
ALTER TABLE byte_identical_r1 ADD COLUMN y UInt64 DEFAULT rand(); ALTER TABLE byte_identical_r1 ADD COLUMN y UInt64 DEFAULT rand();
SYSTEM SYNC REPLICA byte_identical_r1; SYSTEM SYNC REPLICA byte_identical_r1;
SYSTEM SYNC REPLICA byte_identical_r2; SYSTEM SYNC REPLICA byte_identical_r2;
SET replication_alter_partitions_sync=2;
OPTIMIZE TABLE byte_identical_r1 PARTITION tuple() FINAL; OPTIMIZE TABLE byte_identical_r1 PARTITION tuple() FINAL;
SELECT x, t1.y - t2.y FROM byte_identical_r1 t1 SEMI LEFT JOIN byte_identical_r2 t2 USING x ORDER BY x; SELECT x, t1.y - t2.y FROM byte_identical_r1 t1 SEMI LEFT JOIN byte_identical_r2 t2 USING x ORDER BY x;

View File

@ -1,9 +1,9 @@
log log
Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0 ::1 Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0
Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0 ::1 Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0 ::1 Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0 ::1 Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0 ::1 Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
parts parts
Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0 Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0 Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0

View File

@ -1,12 +1,14 @@
drop table if exists rmt; drop table if exists rmt;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n; -- cleanup code will perform extra Exists
-- (so the .reference will not match)
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400;
system sync replica rmt; system sync replica rmt;
insert into rmt values (1); insert into rmt values (1);
insert into rmt values (1); insert into rmt values (1);
system flush logs; system flush logs;
select 'log'; select 'log';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type, select address, type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12) from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12)
order by xid, type, request_idx; order by xid, type, request_idx;

View File

@ -98,6 +98,7 @@ SELECT '*** disable the feature';
ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
SET replication_alter_partitions_sync=2;
/* all_0_0_6 - we disabled the feature, both replicas will merge */ /* all_0_0_6 - we disabled the feature, both replicas will merge */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL; OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
/* all_0_0_7 - same */ /* all_0_0_7 - same */

View File

@ -28,3 +28,7 @@ foo
-- vector -- vector
xx.blogspot.co.at xx.blogspot.co.at
-- no new line
foo.bar
a.foo.bar
foo.baz

View File

@ -37,3 +37,8 @@ select cutToFirstSignificantSubdomainCustom('http://www.foo', 'public_suffix_lis
select '-- vector'; select '-- vector';
select cutToFirstSignificantSubdomainCustom('http://xx.blogspot.co.at/' || toString(number), 'public_suffix_list') from numbers(1); select cutToFirstSignificantSubdomainCustom('http://xx.blogspot.co.at/' || toString(number), 'public_suffix_list') from numbers(1);
select cutToFirstSignificantSubdomainCustom('there-is-no-such-domain' || toString(number), 'public_suffix_list') from numbers(1); select cutToFirstSignificantSubdomainCustom('there-is-no-such-domain' || toString(number), 'public_suffix_list') from numbers(1);
select '-- no new line';
select cutToFirstSignificantSubdomainCustom('foo.bar', 'no_new_line_list');
select cutToFirstSignificantSubdomainCustom('a.foo.bar', 'no_new_line_list');
select cutToFirstSignificantSubdomainCustom('a.foo.baz', 'no_new_line_list');

View File

@ -3,11 +3,11 @@ SELECT 01856_test_function_0(2, 3, 4);
SELECT isConstant(01856_test_function_0(1, 2, 3)); SELECT isConstant(01856_test_function_0(1, 2, 3));
DROP FUNCTION 01856_test_function_0; DROP FUNCTION 01856_test_function_0;
CREATE FUNCTION 01856_test_function_1 AS (a, b) -> a || b || c; --{serverError 47} CREATE FUNCTION 01856_test_function_1 AS (a, b) -> a || b || c; --{serverError 47}
CREATE FUNCTION 01856_test_function_1 AS (a, b) -> 01856_test_function_1(a, b) + 01856_test_function_1(a, b); --{serverError 600} CREATE FUNCTION 01856_test_function_1 AS (a, b) -> 01856_test_function_1(a, b) + 01856_test_function_1(a, b); --{serverError 611}
CREATE FUNCTION cast AS a -> a + 1; --{serverError 598} CREATE FUNCTION cast AS a -> a + 1; --{serverError 609}
CREATE FUNCTION sum AS (a, b) -> a + b; --{serverError 598} CREATE FUNCTION sum AS (a, b) -> a + b; --{serverError 609}
CREATE FUNCTION 01856_test_function_2 AS (a, b) -> a + b; CREATE FUNCTION 01856_test_function_2 AS (a, b) -> a + b;
CREATE FUNCTION 01856_test_function_2 AS (a) -> a || '!!!'; --{serverError 598} CREATE FUNCTION 01856_test_function_2 AS (a) -> a || '!!!'; --{serverError 609}
DROP FUNCTION 01856_test_function_2; DROP FUNCTION 01856_test_function_2;
DROP FUNCTION unknown_function; -- {serverError 46} DROP FUNCTION unknown_function; -- {serverError 46}
DROP FUNCTION CAST; -- {serverError 599} DROP FUNCTION CAST; -- {serverError 610}

View File

@ -0,0 +1,5 @@
20
20
20
20
20

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS short;
DROP TABLE IF EXISTS long;
DROP TABLE IF EXISTS merged;
CREATE TABLE short (e Int64, t DateTime ) ENGINE = MergeTree PARTITION BY e ORDER BY t;
CREATE TABLE long (e Int64, t DateTime ) ENGINE = MergeTree PARTITION BY (e, toStartOfMonth(t)) ORDER BY t;
insert into short select number % 11, toDateTime('2021-01-01 00:00:00') + number from numbers(1000);
insert into long select number % 11, toDateTime('2021-01-01 00:00:00') + number from numbers(1000);
CREATE TABLE merged as short ENGINE = Merge(currentDatabase(), 'short|long');
select sum(e) from (select * from merged order by t limit 10) SETTINGS optimize_read_in_order = 0;
select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 1;
select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 3;
select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 10;
select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 50;
DROP TABLE IF EXISTS short;
DROP TABLE IF EXISTS long;
DROP TABLE IF EXISTS merged;

View File

@ -0,0 +1,4 @@
0 \N
0 \N
0 \N

View File

@ -0,0 +1,3 @@
set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '1', join_algorithm = 'partial_merge';
SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1);

View File

@ -222,3 +222,10 @@ find $ROOT_PATH/{src,base,programs,utils,tests,docs,website,cmake} -name '*.md'
# Forbid subprocess.check_call(...) in integration tests because it does not provide enough information on errors # Forbid subprocess.check_call(...) in integration tests because it does not provide enough information on errors
find $ROOT_PATH'/tests/integration' -name '*.py' | find $ROOT_PATH'/tests/integration' -name '*.py' |
xargs grep -F 'subprocess.check_call' | grep -v "STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL" && echo "Use helpers.cluster.run_and_check or subprocess.run instead of subprocess.check_call to print detailed info on error" xargs grep -F 'subprocess.check_call' | grep -v "STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL" && echo "Use helpers.cluster.run_and_check or subprocess.run instead of subprocess.check_call to print detailed info on error"
# Forbid non-unique error codes
if [[ "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | wc -l)" != "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | sort | uniq | wc -l)" ]]
then
echo "ErrorCodes.cpp contains non-unique error codes"
fi