Merge branch 'master' into async-reads

This commit is contained in:
Alexey Milovidov 2021-08-31 02:52:58 +03:00
commit ca6f6306f3
118 changed files with 1927 additions and 241 deletions

View File

@ -98,6 +98,8 @@ RUN set -x \
&& echo 'dockremap:165536:65536' >> /etc/subuid \ && echo 'dockremap:165536:65536' >> /etc/subuid \
&& echo 'dockremap:165536:65536' >> /etc/subgid && echo 'dockremap:165536:65536' >> /etc/subgid
RUN echo '127.0.0.1 localhost test.com' >> /etc/hosts
EXPOSE 2375 EXPOSE 2375
ENTRYPOINT ["dockerd-entrypoint.sh"] ENTRYPOINT ["dockerd-entrypoint.sh"]
CMD ["sh", "-c", "pytest $PYTEST_OPTS"] CMD ["sh", "-c", "pytest $PYTEST_OPTS"]

View File

@ -0,0 +1,11 @@
version: '2.3'
services:
# nginx server to host static files.
# Accepts only PUT data by test.com/path and GET already existing data on test.com/path.
# Files will be put into /usr/share/nginx/files.
nginx:
image: kssenii/nginx-test:1.1
restart: always
ports:
- 80:80

View File

@ -58,7 +58,7 @@ function start()
echo "Cannot start clickhouse-server" echo "Cannot start clickhouse-server"
cat /var/log/clickhouse-server/stdout.log cat /var/log/clickhouse-server/stdout.log
tail -n1000 /var/log/clickhouse-server/stderr.log tail -n1000 /var/log/clickhouse-server/stderr.log
tail -n100000 /var/log/clickhouse-server/clickhouse-server.log | grep -F -v '<Warning> RaftInstance:' -e '<Information> RaftInstance' | tail -n1000 tail -n100000 /var/log/clickhouse-server/clickhouse-server.log | grep -F -v -e '<Warning> RaftInstance:' -e '<Information> RaftInstance' | tail -n1000
break break
fi fi
# use root to match with current uid # use root to match with current uid

View File

@ -44,4 +44,10 @@ Restrictions:
- some data types are sent as strings - some data types are sent as strings
To cancel a long query use `KILL QUERY connection_id` statement (it is replaced with `KILL QUERY WHERE query_id = connection_id` while proceeding). For example:
``` bash
$ mysql --protocol tcp -h mysql_server -P 9004 default -u default --password=123 -e "KILL QUERY 123456;"
```
[Original article](https://clickhouse.tech/docs/en/interfaces/mysql/) <!--hide--> [Original article](https://clickhouse.tech/docs/en/interfaces/mysql/) <!--hide-->

View File

@ -2236,6 +2236,53 @@ defaultRoles()
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)). Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
## getServerPort {#getserverport}
Returns the number of the server port. When the port is not used by the server, throws an exception.
**Syntax**
``` sql
getServerPort(port_name)
```
**Arguments**
- `port_name` — The name of the server port. [String](../../sql-reference/data-types/string.md#string). Possible values:
- 'tcp_port'
- 'tcp_port_secure'
- 'http_port'
- 'https_port'
- 'interserver_http_port'
- 'interserver_https_port'
- 'mysql_port'
- 'postgresql_port'
- 'grpc_port'
- 'prometheus.port'
**Returned value**
- The number of the server port.
Type: [UInt16](../../sql-reference/data-types/int-uint.md).
**Example**
Query:
``` sql
SELECT getServerPort('tcp_port');
```
Result:
``` text
┌─getServerPort('tcp_port')─┐
│ 9000 │
└───────────────────────────┘
```
## queryID {#query-id} ## queryID {#query-id}
Returns the ID of the current query. Other parameters of a query can be extracted from the [system.query_log](../../operations/system-tables/query_log.md) table via `query_id`. Returns the ID of the current query. Other parameters of a query can be extracted from the [system.query_log](../../operations/system-tables/query_log.md) table via `query_id`.

View File

@ -43,3 +43,9 @@ mysql>
- не поддерживаются подготовленные запросы - не поддерживаются подготовленные запросы
- некоторые типы данных отправляются как строки - некоторые типы данных отправляются как строки
Чтобы прервать долго выполняемый запрос, используйте запрос `KILL QUERY connection_id` (во время выполнения он будет заменен на `KILL QUERY WHERE query_id = connection_id`). Например:
``` bash
$ mysql --protocol tcp -h mysql_server -P 9004 default -u default --password=123 -e "KILL QUERY 123456;"
```

View File

@ -2186,6 +2186,53 @@ defaultRoles()
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)). Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
## getServerPort {#getserverport}
Возвращает номер порта сервера. Если порт не используется сервером, генерируется исключение.
**Синтаксис**
``` sql
getServerPort(port_name)
```
**Аргументы**
- `port_name` — имя порта сервера. [String](../../sql-reference/data-types/string.md#string). Возможные значения:
- 'tcp_port'
- 'tcp_port_secure'
- 'http_port'
- 'https_port'
- 'interserver_http_port'
- 'interserver_https_port'
- 'mysql_port'
- 'postgresql_port'
- 'grpc_port'
- 'prometheus.port'
**Возвращаемое значение**
- Номер порта сервера.
Тип: [UInt16](../../sql-reference/data-types/int-uint.md).
**Пример**
Запрос:
``` sql
SELECT getServerPort('tcp_port');
```
Результат:
``` text
┌─getServerPort('tcp_port')─┐
│ 9000 │
└───────────────────────────┘
```
## queryID {#query-id} ## queryID {#query-id}
Возвращает идентификатор текущего запроса. Другие параметры запроса могут быть извлечены из системной таблицы [system.query_log](../../operations/system-tables/query_log.md) через `query_id`. Возвращает идентификатор текущего запроса. Другие параметры запроса могут быть извлечены из системной таблицы [system.query_log](../../operations/system-tables/query_log.md) через `query_id`.

View File

@ -45,9 +45,9 @@ option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Li
${ENABLE_CLICKHOUSE_ALL}) ${ENABLE_CLICKHOUSE_ALL})
# https://presentations.clickhouse.tech/matemarketing_2020/ # https://presentations.clickhouse.tech/matemarketing_2020/
option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories" option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories" ${ENABLE_CLICKHOUSE_ALL})
${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER "A tool to export table data files to be later put to a static files web server" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_KEEPER "ClickHouse alternative to ZooKeeper" ${ENABLE_CLICKHOUSE_ALL}) option (ENABLE_CLICKHOUSE_KEEPER "ClickHouse alternative to ZooKeeper" ${ENABLE_CLICKHOUSE_ALL})
@ -227,6 +227,7 @@ add_subdirectory (obfuscator)
add_subdirectory (install) add_subdirectory (install)
add_subdirectory (git-import) add_subdirectory (git-import)
add_subdirectory (bash-completion) add_subdirectory (bash-completion)
add_subdirectory (static-files-disk-uploader)
if (ENABLE_CLICKHOUSE_KEEPER) if (ENABLE_CLICKHOUSE_KEEPER)
add_subdirectory (keeper) add_subdirectory (keeper)
@ -258,7 +259,8 @@ if (CLICKHOUSE_ONE_SHARED)
${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_GIT_IMPORT_SOURCES}
${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES}
${CLICKHOUSE_KEEPER_SOURCES} ${CLICKHOUSE_KEEPER_SOURCES}
${CLICKHOUSE_KEEPER_CONVERTER_SOURCES}) ${CLICKHOUSE_KEEPER_CONVERTER_SOURCES}
${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES})
target_link_libraries(clickhouse-lib target_link_libraries(clickhouse-lib
${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_SERVER_LINK}
@ -273,7 +275,8 @@ if (CLICKHOUSE_ONE_SHARED)
${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_GIT_IMPORT_LINK}
${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK}
${CLICKHOUSE_KEEPER_LINK} ${CLICKHOUSE_KEEPER_LINK}
${CLICKHOUSE_KEEPER_CONVERTER_LINK}) ${CLICKHOUSE_KEEPER_CONVERTER_LINK}
${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK})
target_include_directories(clickhouse-lib target_include_directories(clickhouse-lib
${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_SERVER_INCLUDE}
@ -306,6 +309,7 @@ if (CLICKHOUSE_SPLIT_BINARY)
clickhouse-obfuscator clickhouse-obfuscator
clickhouse-git-import clickhouse-git-import
clickhouse-copier clickhouse-copier
clickhouse-static-files-disk-uploader
) )
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE) if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
@ -371,6 +375,9 @@ else ()
if (ENABLE_CLICKHOUSE_GIT_IMPORT) if (ENABLE_CLICKHOUSE_GIT_IMPORT)
clickhouse_target_link_split_lib(clickhouse git-import) clickhouse_target_link_split_lib(clickhouse git-import)
endif () endif ()
if (ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER)
clickhouse_target_link_split_lib(clickhouse static-files-disk-uploader)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER) if (ENABLE_CLICKHOUSE_KEEPER)
clickhouse_target_link_split_lib(clickhouse keeper) clickhouse_target_link_split_lib(clickhouse keeper)
endif() endif()
@ -432,6 +439,11 @@ else ()
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-git-import" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-git-import" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-git-import) list(APPEND CLICKHOUSE_BUNDLE clickhouse-git-import)
endif () endif ()
if (ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER)
add_custom_target (clickhouse-static-files-disk-uploader ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-static-files-disk-uploader DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-static-files-disk-uploader" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-static-files-disk-uploader)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER) if (ENABLE_CLICKHOUSE_KEEPER)
add_custom_target (clickhouse-keeper ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-keeper DEPENDS clickhouse) add_custom_target (clickhouse-keeper ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-keeper DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -18,3 +18,4 @@
#cmakedefine01 ENABLE_CLICKHOUSE_LIBRARY_BRIDGE #cmakedefine01 ENABLE_CLICKHOUSE_LIBRARY_BRIDGE
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER
#cmakedefine01 ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER

View File

@ -62,6 +62,9 @@ int mainEntryClickHouseKeeper(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_KEEPER #if ENABLE_CLICKHOUSE_KEEPER
int mainEntryClickHouseKeeperConverter(int argc, char ** argv); int mainEntryClickHouseKeeperConverter(int argc, char ** argv);
#endif #endif
#if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_INSTALL #if ENABLE_CLICKHOUSE_INSTALL
int mainEntryClickHouseInstall(int argc, char ** argv); int mainEntryClickHouseInstall(int argc, char ** argv);
int mainEntryClickHouseStart(int argc, char ** argv); int mainEntryClickHouseStart(int argc, char ** argv);
@ -131,6 +134,9 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"stop", mainEntryClickHouseStop}, {"stop", mainEntryClickHouseStop},
{"status", mainEntryClickHouseStatus}, {"status", mainEntryClickHouseStatus},
{"restart", mainEntryClickHouseRestart}, {"restart", mainEntryClickHouseRestart},
#endif
#if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER
{"static-files-disk-uploader", mainEntryClickHouseStaticFilesDiskUploader},
#endif #endif
{"hash-binary", mainEntryClickHouseHashBinary}, {"hash-binary", mainEntryClickHouseHashBinary},
}; };

View File

@ -0,0 +1,9 @@
set (CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES static-files-disk-uploader.cpp)
set (CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK
PRIVATE
boost::program_options
dbms
)
clickhouse_program_add(static-files-disk-uploader)

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseStaticFilesDiskUploader(argc_, argv_); }

View File

@ -0,0 +1,162 @@
#include <Common/Exception.h>
#include <Common/TerminalSize.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <IO/createReadBufferFromFileBase.h>
#include <boost/program_options.hpp>
#include <re2/re2.h>
#include <filesystem>
namespace fs = std::filesystem;
#define UUID_PATTERN "[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}"
#define EXTRACT_UUID_PATTERN fmt::format(".*/({})/.*", UUID_PATTERN)
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/*
* A tool to collect table data files on local fs as is (into current directory or into path from --output-dir option).
* If test-mode option is added, files will be put by given url via PUT request.
*/
void processTableFiles(const fs::path & path, const String & files_prefix, String uuid,
WriteBuffer & metadata_buf, std::function<std::shared_ptr<WriteBuffer>(const String &)> create_dst_buf)
{
fs::directory_iterator dir_end;
auto process_file = [&](const String & file_name, const String & file_path)
{
auto remote_file_name = files_prefix + "-" + uuid + "-" + file_name;
writeText(remote_file_name, metadata_buf);
writeChar('\t', metadata_buf);
writeIntText(fs::file_size(file_path), metadata_buf);
writeChar('\n', metadata_buf);
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
auto dst_buf = create_dst_buf(remote_file_name);
copyData(*src_buf, *dst_buf);
dst_buf->next();
dst_buf->finalize();
};
for (fs::directory_iterator dir_it(path); dir_it != dir_end; ++dir_it)
{
if (dir_it->is_directory())
{
fs::directory_iterator files_end;
for (fs::directory_iterator file_it(dir_it->path()); file_it != files_end; ++file_it)
process_file(dir_it->path().filename().string() + "-" + file_it->path().filename().string(), file_it->path());
}
else
{
process_file(dir_it->path().filename(), dir_it->path());
}
}
}
}
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv)
try
{
using namespace DB;
namespace po = boost::program_options;
po::options_description description("Allowed options", getTerminalWidth());
description.add_options()
("help,h", "produce help message")
("metadata-path", po::value<std::string>(), "Metadata path (select data_paths from system.tables where name='table_name'")
("test-mode", "Use test mode, which will put data on given url via PUT")
("url", po::value<std::string>(), "Web server url for test mode")
("output-dir", po::value<std::string>(), "Directory to put files in non-test mode")
("files-prefix", po::value<std::string>(), "Prefix for stored files");
po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run();
po::variables_map options;
po::store(parsed, options);
po::notify(options);
if (options.empty() || options.count("help"))
{
std::cout << description << std::endl;
exit(0);
}
String url, metadata_path, files_prefix;
if (options.count("metadata-path"))
metadata_path = options["metadata-path"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No metadata-path option passed");
if (options.count("files-prefix"))
files_prefix = options["files-prefix"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No files-prefix option passed");
fs::path fs_path = fs::weakly_canonical(metadata_path);
if (!fs::exists(fs_path))
{
std::cerr << fmt::format("Data path ({}) does not exist", fs_path.string());
return 1;
}
String uuid;
if (!RE2::Extract(metadata_path, EXTRACT_UUID_PATTERN, "\\1", &uuid))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract uuid for: {}", metadata_path);
std::shared_ptr<WriteBuffer> metadata_buf;
std::function<std::shared_ptr<WriteBuffer>(const String &)> create_dst_buf;
String root_path;
if (options.count("test-mode"))
{
if (options.count("url"))
url = options["url"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No url option passed for test mode");
metadata_buf = std::make_shared<WriteBufferFromHTTP>(Poco::URI(fs::path(url) / (".index-" + uuid)), Poco::Net::HTTPRequest::HTTP_PUT);
create_dst_buf = [&](const String & remote_file_name)
{
return std::make_shared<WriteBufferFromHTTP>(Poco::URI(fs::path(url) / remote_file_name), Poco::Net::HTTPRequest::HTTP_PUT);
};
}
else
{
if (options.count("output-dir"))
root_path = options["output-dir"].as<std::string>();
else
root_path = fs::current_path();
metadata_buf = std::make_shared<WriteBufferFromFile>(fs::path(root_path) / (".index-" + uuid));
create_dst_buf = [&](const String & remote_file_name)
{
return std::make_shared<WriteBufferFromFile>(fs::path(root_path) / remote_file_name);
};
}
processTableFiles(fs_path, files_prefix, uuid, *metadata_buf, create_dst_buf);
metadata_buf->next();
metadata_buf->finalize();
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(false);
return 1;
}

View File

@ -23,6 +23,7 @@ SRCS(
client/QueryFuzzer.cpp client/QueryFuzzer.cpp
client/ConnectionParameters.cpp client/ConnectionParameters.cpp
client/Suggest.cpp client/Suggest.cpp
client/TestHint.cpp
extract-from-config/ExtractFromConfig.cpp extract-from-config/ExtractFromConfig.cpp
server/Server.cpp server/Server.cpp
server/MetricsTransmitter.cpp server/MetricsTransmitter.cpp

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -48,6 +48,8 @@ struct AggregateFunctionSequenceMatchData final
bool sorted = true; bool sorted = true;
PODArrayWithStackMemory<TimestampEvents, 64> events_list; PODArrayWithStackMemory<TimestampEvents, 64> events_list;
/// sequenceMatch conditions met at least once in events_list
std::bitset<max_events> conditions_met;
void add(const Timestamp timestamp, const Events & events) void add(const Timestamp timestamp, const Events & events)
{ {
@ -56,6 +58,7 @@ struct AggregateFunctionSequenceMatchData final
{ {
events_list.emplace_back(timestamp, events); events_list.emplace_back(timestamp, events);
sorted = false; sorted = false;
conditions_met |= events;
} }
} }
@ -64,29 +67,9 @@ struct AggregateFunctionSequenceMatchData final
if (other.events_list.empty()) if (other.events_list.empty())
return; return;
const auto size = events_list.size();
events_list.insert(std::begin(other.events_list), std::end(other.events_list)); events_list.insert(std::begin(other.events_list), std::end(other.events_list));
sorted = false;
/// either sort whole container or do so partially merging ranges afterwards conditions_met |= other.conditions_met;
if (!sorted && !other.sorted)
std::sort(std::begin(events_list), std::end(events_list), Comparator{});
else
{
const auto begin = std::begin(events_list);
const auto middle = std::next(begin, size);
const auto end = std::end(events_list);
if (!sorted)
std::sort(begin, middle, Comparator{});
if (!other.sorted)
std::sort(middle, end, Comparator{});
std::inplace_merge(begin, middle, end, Comparator{});
}
sorted = true;
} }
void sort() void sort()
@ -290,6 +273,7 @@ private:
dfa_states.back().transition = DFATransition::SpecificEvent; dfa_states.back().transition = DFATransition::SpecificEvent;
dfa_states.back().event = event_number - 1; dfa_states.back().event = event_number - 1;
dfa_states.emplace_back(); dfa_states.emplace_back();
conditions_in_pattern.set(event_number - 1);
} }
if (!match(")")) if (!match(")"))
@ -518,6 +502,64 @@ protected:
return action_it == action_end; return action_it == action_end;
} }
/// Splits the pattern into deterministic parts separated by non-deterministic fragments
/// (time constraints and Kleene stars), and tries to match the deterministic parts in their specified order,
/// ignoring the non-deterministic fragments.
/// This function can quickly check that a full match is not possible if some deterministic fragment is missing.
template <typename EventEntry>
bool couldMatchDeterministicParts(const EventEntry events_begin, const EventEntry events_end, bool limit_iterations = true) const
{
size_t events_processed = 0;
auto events_it = events_begin;
const auto actions_end = std::end(actions);
auto actions_it = std::begin(actions);
auto det_part_begin = actions_it;
auto match_deterministic_part = [&events_it, events_end, &events_processed, det_part_begin, actions_it, limit_iterations]()
{
auto events_it_init = events_it;
auto det_part_it = det_part_begin;
while (det_part_it != actions_it && events_it != events_end)
{
/// matching any event
if (det_part_it->type == PatternActionType::AnyEvent)
++events_it, ++det_part_it;
/// matching specific event
else
{
if (events_it->second.test(det_part_it->extra))
++events_it, ++det_part_it;
/// abandon current matching, try to match the deterministic fragment further in the list
else
{
events_it = ++events_it_init;
det_part_it = det_part_begin;
}
}
if (limit_iterations && ++events_processed > sequence_match_max_iterations)
throw Exception{"Pattern application proves too difficult, exceeding max iterations (" + toString(sequence_match_max_iterations) + ")",
ErrorCodes::TOO_SLOW};
}
return det_part_it == actions_it;
};
for (; actions_it != actions_end; ++actions_it)
if (actions_it->type != PatternActionType::SpecificEvent && actions_it->type != PatternActionType::AnyEvent)
{
if (!match_deterministic_part())
return false;
det_part_begin = std::next(actions_it);
}
return match_deterministic_part();
}
private: private:
enum class DFATransition : char enum class DFATransition : char
{ {
@ -558,6 +600,8 @@ private:
protected: protected:
/// `True` if the parsed pattern contains time assertions (?t...), `false` otherwise. /// `True` if the parsed pattern contains time assertions (?t...), `false` otherwise.
bool pattern_has_time; bool pattern_has_time;
/// sequenceMatch conditions met at least once in the pattern
std::bitset<max_events> conditions_in_pattern;
private: private:
std::string pattern; std::string pattern;
@ -584,6 +628,12 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{ {
auto & output = assert_cast<ColumnUInt8 &>(to).getData();
if ((this->conditions_in_pattern & this->data(place).conditions_met) != this->conditions_in_pattern)
{
output.push_back(false);
return;
}
this->data(place).sort(); this->data(place).sort();
const auto & data_ref = this->data(place); const auto & data_ref = this->data(place);
@ -592,8 +642,10 @@ public:
const auto events_end = std::end(data_ref.events_list); const auto events_end = std::end(data_ref.events_list);
auto events_it = events_begin; auto events_it = events_begin;
bool match = this->pattern_has_time ? this->backtrackingMatch(events_it, events_end) : this->dfaMatch(events_it, events_end); bool match = (this->pattern_has_time ?
assert_cast<ColumnUInt8 &>(to).getData().push_back(match); (this->couldMatchDeterministicParts(events_begin, events_end) && this->backtrackingMatch(events_it, events_end)) :
this->dfaMatch(events_it, events_end));
output.push_back(match);
} }
}; };
@ -614,8 +666,14 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{ {
auto & output = assert_cast<ColumnUInt64 &>(to).getData();
if ((this->conditions_in_pattern & this->data(place).conditions_met) != this->conditions_in_pattern)
{
output.push_back(0);
return;
}
this->data(place).sort(); this->data(place).sort();
assert_cast<ColumnUInt64 &>(to).getData().push_back(count(place)); output.push_back(count(place));
} }
private: private:
@ -628,8 +686,12 @@ private:
auto events_it = events_begin; auto events_it = events_begin;
size_t count = 0; size_t count = 0;
// check if there is a chance of matching the sequence at least once
if (this->couldMatchDeterministicParts(events_begin, events_end))
{
while (events_it != events_end && this->backtrackingMatch(events_it, events_end)) while (events_it != events_end && this->backtrackingMatch(events_it, events_end))
++count; ++count;
}
return count; return count;
} }

View File

@ -48,6 +48,7 @@ SRCS(
AggregateFunctionSequenceNextNode.cpp AggregateFunctionSequenceNextNode.cpp
AggregateFunctionSimpleLinearRegression.cpp AggregateFunctionSimpleLinearRegression.cpp
AggregateFunctionSimpleState.cpp AggregateFunctionSimpleState.cpp
AggregateFunctionSingleValueOrNull.cpp
AggregateFunctionState.cpp AggregateFunctionState.cpp
AggregateFunctionStatistics.cpp AggregateFunctionStatistics.cpp
AggregateFunctionStatisticsSimple.cpp AggregateFunctionStatisticsSimple.cpp

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F GroupBitmap | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -F GroupBitmap | sed 's/^\.\// /' | sort ?>
) )
END() END()

27
src/Backups/ya.make Normal file
View File

@ -0,0 +1,27 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
BackupEntryConcat.cpp
BackupEntryFromAppendOnlyFile.cpp
BackupEntryFromImmutableFile.cpp
BackupEntryFromMemory.cpp
BackupEntryFromSmallFile.cpp
BackupFactory.cpp
BackupInDirectory.cpp
BackupRenamingConfig.cpp
BackupSettings.cpp
BackupUtils.cpp
hasCompatibleDataToRestoreTable.cpp
renameInCreateQuery.cpp
)
END()

14
src/Backups/ya.make.in Normal file
View File

@ -0,0 +1,14 @@
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -9,7 +9,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -17,7 +17,7 @@ PEERDIR(
) )
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -2,11 +2,21 @@
#include <queue> #include <queue>
#include <type_traits> #include <type_traits>
#include <atomic>
#include <Poco/Mutex.h> #include <Poco/Mutex.h>
#include <Poco/Semaphore.h> #include <Poco/Semaphore.h>
#include <common/MoveOrCopyIfThrow.h> #include <common/MoveOrCopyIfThrow.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** A very simple thread-safe queue of limited size. /** A very simple thread-safe queue of limited size.
* If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty. * If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty.
@ -17,9 +27,41 @@ class ConcurrentBoundedQueue
{ {
private: private:
std::queue<T> queue; std::queue<T> queue;
Poco::FastMutex mutex; mutable Poco::FastMutex mutex;
Poco::Semaphore fill_count; Poco::Semaphore fill_count;
Poco::Semaphore empty_count; Poco::Semaphore empty_count;
std::atomic_bool closed = false;
template <typename... Args>
bool tryEmplaceImpl(Args &&... args)
{
bool emplaced = true;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (closed)
emplaced = false;
else
queue.emplace(std::forward<Args>(args)...);
}
if (emplaced)
fill_count.set();
else
empty_count.set();
return emplaced;
}
void popImpl(T & x)
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
}
public: public:
explicit ConcurrentBoundedQueue(size_t max_fill) explicit ConcurrentBoundedQueue(size_t max_fill)
@ -30,91 +72,75 @@ public:
void push(const T & x) void push(const T & x)
{ {
empty_count.wait(); empty_count.wait();
{ if (!tryEmplaceImpl(x))
Poco::ScopedLock<Poco::FastMutex> lock(mutex); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "tryPush/tryEmplace must be used with close()");
queue.push(x);
}
fill_count.set();
} }
template <typename... Args> template <typename... Args>
void emplace(Args &&... args) void emplace(Args &&... args)
{ {
empty_count.wait(); empty_count.wait();
{ if (!tryEmplaceImpl(std::forward<Args>(args)...))
Poco::ScopedLock<Poco::FastMutex> lock(mutex); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "tryPush/tryEmplace must be used with close()");
queue.emplace(std::forward<Args>(args)...);
}
fill_count.set();
} }
void pop(T & x) void pop(T & x)
{ {
fill_count.wait(); fill_count.wait();
{ popImpl(x);
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
} }
bool tryPush(const T & x, UInt64 milliseconds = 0) bool tryPush(const T & x, UInt64 milliseconds = 0)
{ {
if (empty_count.tryWait(milliseconds)) if (!empty_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.push(x);
}
fill_count.set();
return true;
}
return false; return false;
return tryEmplaceImpl(x);
} }
template <typename... Args> template <typename... Args>
bool tryEmplace(UInt64 milliseconds, Args &&... args) bool tryEmplace(UInt64 milliseconds, Args &&... args)
{ {
if (empty_count.tryWait(milliseconds)) if (!empty_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.emplace(std::forward<Args>(args)...);
}
fill_count.set();
return true;
}
return false; return false;
return tryEmplaceImpl(std::forward<Args>(args)...);
} }
bool tryPop(T & x, UInt64 milliseconds = 0) bool tryPop(T & x, UInt64 milliseconds = 0)
{ {
if (fill_count.tryWait(milliseconds)) if (!fill_count.tryWait(milliseconds))
{ return false;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex); popImpl(x);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
return true; return true;
} }
return false;
}
size_t size() size_t size() const
{ {
Poco::ScopedLock<Poco::FastMutex> lock(mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return queue.size(); return queue.size();
} }
size_t empty() size_t empty() const
{ {
Poco::ScopedLock<Poco::FastMutex> lock(mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return queue.empty(); return queue.empty();
} }
/// Forbids to push new elements to queue.
/// Returns false if queue was not closed before call, returns true if queue was already closed.
bool close()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return closed.exchange(true);
}
bool isClosed() const
{
return closed.load();
}
void clear() void clear()
{ {
while (fill_count.tryWait(0)) while (fill_count.tryWait(0))

View File

@ -540,7 +540,7 @@ void ZooKeeper::sendThread()
try try
{ {
while (!expired) while (!requests_queue.isClosed())
{ {
auto prev_bytes_sent = out->count(); auto prev_bytes_sent = out->count();
@ -572,7 +572,7 @@ void ZooKeeper::sendThread()
info.request->has_watch = true; info.request->has_watch = true;
} }
if (expired) if (requests_queue.isClosed())
{ {
break; break;
} }
@ -617,7 +617,7 @@ void ZooKeeper::receiveThread()
try try
{ {
Int64 waited = 0; Int64 waited = 0;
while (!expired) while (!requests_queue.isClosed())
{ {
auto prev_bytes_received = in->count(); auto prev_bytes_received = in->count();
@ -640,7 +640,7 @@ void ZooKeeper::receiveThread()
if (in->poll(max_wait)) if (in->poll(max_wait))
{ {
if (expired) if (requests_queue.isClosed())
break; break;
receiveEvent(); receiveEvent();
@ -840,12 +840,10 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
auto expire_session_if_not_expired = [&] auto expire_session_if_not_expired = [&]
{ {
std::lock_guard lock(push_request_mutex); /// No new requests will appear in queue after close()
if (!expired) bool was_already_closed = requests_queue.close();
{ if (!was_already_closed)
expired = true;
active_session_metric_increment.destroy(); active_session_metric_increment.destroy();
}
}; };
try try
@ -1018,18 +1016,16 @@ void ZooKeeper::pushRequest(RequestInfo && info)
} }
} }
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls if (requests_queue.isClosed())
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(push_request_mutex);
if (expired)
throw Exception("Session expired", Error::ZSESSIONEXPIRED); throw Exception("Session expired", Error::ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
{
if (requests_queue.isClosed())
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT); throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
} }
}
catch (...) catch (...)
{ {
finalize(false, false); finalize(false, false);

View File

@ -121,7 +121,7 @@ public:
/// If expired, you can only destroy the object. All other methods will throw exception. /// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const override { return expired; } bool isExpired() const override { return requests_queue.isClosed(); }
/// Useful to check owner of ephemeral node. /// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; } int64_t getSessionID() const override { return session_id; }
@ -207,11 +207,9 @@ private:
int64_t session_id = 0; int64_t session_id = 0;
std::atomic<XID> next_xid {1}; std::atomic<XID> next_xid {1};
std::atomic<bool> expired {false};
/// Mark session finalization start. Used to avoid simultaneous /// Mark session finalization start. Used to avoid simultaneous
/// finalization from different threads. One-shot flag. /// finalization from different threads. One-shot flag.
std::atomic<bool> finalization_started {false}; std::atomic<bool> finalization_started {false};
std::mutex push_request_mutex;
using clock = std::chrono::steady_clock; using clock = std::chrono::steady_clock;
@ -225,7 +223,7 @@ private:
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>; using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1}; RequestsQueue requests_queue{1024};
void pushRequest(RequestInfo && info); void pushRequest(RequestInfo && info);
using Operations = std::map<XID, RequestInfo>; using Operations = std::map<XID, RequestInfo>;

View File

@ -118,7 +118,6 @@ SRCS(
isLocalAddress.cpp isLocalAddress.cpp
isValidUTF8.cpp isValidUTF8.cpp
malloc.cpp malloc.cpp
memory.cpp
new_delete.cpp new_delete.cpp
parseAddress.cpp parseAddress.cpp
parseGlobs.cpp parseGlobs.cpp

View File

@ -24,7 +24,7 @@ INCLUDE(${ARCADIA_ROOT}/clickhouse/cmake/yandex/ya.make.versions.inc)
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -32,6 +32,7 @@ SRCS(
CompressionCodecT64.cpp CompressionCodecT64.cpp
CompressionCodecZSTD.cpp CompressionCodecZSTD.cpp
CompressionFactory.cpp CompressionFactory.cpp
CompressionFactoryAdditions.cpp
ICompressionCodec.cpp ICompressionCodec.cpp
LZ4_decompress_faster.cpp LZ4_decompress_faster.cpp
getCompressionCodecForFile.cpp getCompressionCodecForFile.cpp

View File

@ -15,7 +15,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -12,7 +12,6 @@
#include <IO/LimitReadBuffer.h> #include <IO/LimitReadBuffer.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h> #include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>

View File

@ -75,6 +75,7 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \ M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \ M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \ M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, http_max_single_read_retries, 4, "The maximum number of retries during single http read.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \ M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \

View File

@ -37,6 +37,7 @@ SRCS(
PostgreSQL/insertPostgreSQLValue.cpp PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp PostgreSQLProtocol.cpp
QueryProcessingStage.cpp QueryProcessingStage.cpp
ServerUUID.cpp
Settings.cpp Settings.cpp
SettingsEnums.cpp SettingsEnums.cpp
SettingsFields.cpp SettingsFields.cpp

View File

@ -10,7 +10,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -44,6 +44,7 @@ SRCS(
SquashingTransform.cpp SquashingTransform.cpp
TTLAggregationAlgorithm.cpp TTLAggregationAlgorithm.cpp
TTLBlockInputStream.cpp TTLBlockInputStream.cpp
TTLCalcInputStream.cpp
TTLColumnAlgorithm.cpp TTLColumnAlgorithm.cpp
TTLDeleteAlgorithm.cpp TTLDeleteAlgorithm.cpp
TTLUpdateInfoAlgorithm.cpp TTLUpdateInfoAlgorithm.cpp

View File

@ -9,7 +9,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -416,7 +416,8 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil; return UUIDHelpers::Nil;
} }
void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach) void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{ {
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken /// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag) if (has_force_restore_data_flag)
@ -433,7 +434,7 @@ void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has
} }
} }
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach); DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (has_force_restore_data_flag) if (has_force_restore_data_flag)
{ {

View File

@ -47,7 +47,7 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override; DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override; void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
/// Atomic database cannot be detached if there is detached table which still in use /// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override; void assertCanBeDetached(bool cleanup) override;

View File

@ -36,9 +36,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects( void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context, ContextMutablePtr local_context, bool /* has_force_restore_data_flag */, bool /*force_attach*/, bool /* skip_startup_tables */)
bool /* has_force_restore_data_flag */,
bool /*force_attach*/)
{ {
iterateMetadataFiles(local_context, [this](const String & file_name) iterateMetadataFiles(local_context, [this](const String & file_name)
{ {
@ -246,6 +244,8 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
if (!ast || !endsWith(table->getName(), "Log")) if (!ast || !endsWith(table->getName(), "Log"))
throw Exception("Only *Log tables can be used with Lazy database engine.", ErrorCodes::LOGICAL_ERROR); throw Exception("Only *Log tables can be used with Lazy database engine.", ErrorCodes::LOGICAL_ERROR);
table->startup();
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name); auto it = tables_cache.find(table_name);

View File

@ -26,9 +26,7 @@ public:
bool canContainDistributedTables() const override { return false; } bool canContainDistributedTables() const override { return false; }
void loadStoredObjects( void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
ContextMutablePtr context,
bool has_force_restore_data_flag, bool force_attach) override;
void createTable( void createTable(
ContextPtr context, ContextPtr context,

View File

@ -83,7 +83,8 @@ DatabaseOrdinary::DatabaseOrdinary(
{ {
} }
void DatabaseOrdinary::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/) void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/, bool skip_startup_tables)
{ {
/** Tables load faster if they are loaded in sorted (by name) order. /** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order, * Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
@ -201,12 +202,20 @@ void DatabaseOrdinary::loadStoredObjects(ContextMutablePtr local_context, bool h
pool.wait(); pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them. /// After all tables was basically initialized, startup them.
startupTables(pool); startupTablesImpl(pool);
}
} }
void DatabaseOrdinary::startupTables()
{
ThreadPool pool;
startupTablesImpl(pool);
}
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool) void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
{ {
LOG_INFO(log, "Starting up tables."); LOG_INFO(log, "Starting up tables.");

View File

@ -20,7 +20,9 @@ public:
String getEngineName() const override { return "Ordinary"; } String getEngineName() const override { return "Ordinary"; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override; void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void startupTables() override;
void alterTable( void alterTable(
ContextPtr context, ContextPtr context,
@ -35,7 +37,7 @@ protected:
const String & statement, const String & statement,
ContextPtr query_context); ContextPtr query_context);
void startupTables(ThreadPool & thread_pool); void startupTablesImpl(ThreadPool & thread_pool);
}; };
} }

View File

@ -305,11 +305,12 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
createEmptyLogEntry(current_zookeeper); createEmptyLogEntry(current_zookeeper);
} }
void DatabaseReplicated::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach) void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{ {
tryConnectToZooKeeperAndInitDatabase(force_attach); tryConnectToZooKeeperAndInitDatabase(force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach); DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext()); ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup(); ddl_worker->startup();

View File

@ -57,7 +57,7 @@ public:
void drop(ContextPtr /*context*/) override; void drop(ContextPtr /*context*/) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override; void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void shutdown() override; void shutdown() override;
friend struct DatabaseReplicatedTask; friend struct DatabaseReplicatedTask;

View File

@ -123,7 +123,15 @@ public:
/// Load a set of existing tables. /// Load a set of existing tables.
/// You can call only once, right after the object is created. /// You can call only once, right after the object is created.
virtual void loadStoredObjects(ContextMutablePtr /*context*/, bool /*has_force_restore_data_flag*/, bool /*force_attach*/ = false) {} virtual void loadStoredObjects(
ContextMutablePtr /*context*/,
bool /*has_force_restore_data_flag*/,
bool /*force_attach*/ = false,
bool /* skip_startup_tables */ = false)
{
}
virtual void startupTables() {}
/// Check the existence of the table. /// Check the existence of the table.
virtual bool isTableExist(const String & name, ContextPtr context) const = 0; virtual bool isTableExist(const String & name, ContextPtr context) const = 0;

View File

@ -93,10 +93,11 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
exception = exception_; exception = exception_;
} }
template<typename Base> template <typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach) void DatabaseMaterializedMySQL<Base>::loadStoredObjects(
ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{ {
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach); Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (!force_attach) if (!force_attach)
materialize_thread.assertMySQLAvailable(); materialize_thread.assertMySQLAvailable();

View File

@ -43,7 +43,7 @@ protected:
public: public:
String getEngineName() const override { return "MaterializedMySQL"; } String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach) override; void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override; void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -405,7 +405,7 @@ String DatabaseMySQL::getMetadataPath() const
return metadata_path; return metadata_path;
} }
void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, bool, bool /*force_attach*/) void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, bool, bool /*force_attach*/, bool /* skip_startup_tables */)
{ {
std::lock_guard<std::mutex> lock{mutex}; std::lock_guard<std::mutex> lock{mutex};

View File

@ -75,7 +75,7 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override; void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override; void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
StoragePtr detachTable(const String & table_name) override; StoragePtr detachTable(const String & table_name) override;

View File

@ -109,9 +109,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
} }
void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach) void DatabaseMaterializedPostgreSQL::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{ {
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach); DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
try try
{ {
@ -124,7 +125,6 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_c
if (!force_attach) if (!force_attach)
throw; throw;
} }
} }

View File

@ -43,10 +43,10 @@ public:
String getMetadataPath() const override { return metadata_path; } String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override; void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
DatabaseTablesIteratorPtr getTablesIterator( DatabaseTablesIteratorPtr
ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override; getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override; StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -280,7 +280,7 @@ void DatabasePostgreSQL::drop(ContextPtr /*context*/)
} }
void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, bool, bool /*force_attach*/) void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, bool, bool /*force_attach*/, bool /* skip_startup_tables */)
{ {
{ {
std::lock_guard<std::mutex> lock{mutex}; std::lock_guard<std::mutex> lock{mutex};

View File

@ -48,7 +48,7 @@ public:
bool empty() const override; bool empty() const override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override; void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override; DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;

View File

@ -11,7 +11,8 @@ enum class DiskType
RAM, RAM,
S3, S3,
HDFS, HDFS,
Encrypted Encrypted,
WebServer,
}; };
inline String toString(DiskType disk_type) inline String toString(DiskType disk_type)
@ -28,6 +29,8 @@ inline String toString(DiskType disk_type)
return "hdfs"; return "hdfs";
case DiskType::Encrypted: case DiskType::Encrypted:
return "encrypted"; return "encrypted";
case DiskType::WebServer:
return "web";
} }
__builtin_unreachable(); __builtin_unreachable();
} }

336
src/Disks/DiskWebServer.cpp Normal file
View File

@ -0,0 +1,336 @@
#include "DiskWebServer.h"
#include <common/logger_useful.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Disks/ReadIndirectBufferFromWebServer.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IDiskRemote.h>
#include <Access/AccessControlManager.h>
#include <Poco/Exception.h>
#include <re2/re2.h>
#define UUID_PATTERN "[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}"
#define EXTRACT_UUID_PATTERN fmt::format(".*/({})/.*", UUID_PATTERN)
#define DIRECTORY_FILE_PATTERN(prefix) fmt::format("{}-({})-(\\w+)-(\\w+\\.\\w+)", prefix, UUID_PATTERN)
#define ROOT_FILE_PATTERN(prefix) fmt::format("{}-({})-(\\w+\\.\\w+)", prefix, UUID_PATTERN)
#define MATCH_DIRECTORY_FILE_PATTERN fmt::format(".*/({})/(\\w+)/(\\w+\\.\\w+)", UUID_PATTERN)
#define MATCH_ROOT_FILE_PATTERN fmt::format(".*/({})/(\\w+\\.\\w+)", UUID_PATTERN)
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int NETWORK_ERROR;
extern const int NOT_IMPLEMENTED;
}
void DiskWebServer::Metadata::initialize(const String & uri_with_path, const String & files_prefix, const String & table_uuid, ContextPtr context) const
{
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_with_path) / (".index-" + table_uuid)),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(context));
String uuid, directory, file, remote_file_name;
size_t file_size;
while (!metadata_buf.eof())
{
readText(remote_file_name, metadata_buf);
assertChar('\t', metadata_buf);
readIntText(file_size, metadata_buf);
assertChar('\n', metadata_buf);
LOG_DEBUG(&Poco::Logger::get("DiskWeb"), "Read file: {}, size: {}", remote_file_name, file_size);
/*
* URI/ {prefix}-{uuid}-all_x_x_x-{file}
* ...
* {prefix}-{uuid}-format_version.txt
* {prefix}-{uuid}-detached-{file}
* ...
**/
if (RE2::FullMatch(remote_file_name, DIRECTORY_FILE_PATTERN(files_prefix), &uuid, &directory, &file))
{
if (uuid != table_uuid)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected uuid: {}, expected: {}", uuid, table_uuid);
tables_data[uuid][directory].emplace(File(file, file_size));
}
else if (RE2::FullMatch(remote_file_name, ROOT_FILE_PATTERN(files_prefix), &uuid, &file))
{
if (uuid != table_uuid)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected uuid: {}, expected: {}", uuid, table_uuid);
tables_data[uuid][file].emplace(File(file, file_size));
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", remote_file_name);
}
}
template <typename T>
class DiskWebDirectoryIterator final : public IDiskDirectoryIterator
{
public:
using Directory = std::unordered_map<String, T>;
DiskWebDirectoryIterator(Directory & directory_, const String & directory_root_)
: directory(directory_), iter(directory.begin()), directory_root(directory_root_)
{
}
void next() override { ++iter; }
bool isValid() const override
{
return iter != directory.end();
}
String path() const override
{
return fs::path(directory_root) / name();
}
String name() const override
{
return iter->first;
}
private:
Directory & directory;
typename Directory::iterator iter;
const String directory_root;
};
class ReadBufferFromWebServer final : public ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>
{
public:
ReadBufferFromWebServer(
const String & uri_,
RemoteMetadata metadata_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_)
: ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>(metadata_)
, uri(uri_)
, context(context_)
, max_read_tries(max_read_tries_)
, buf_size(buf_size_)
{
}
std::unique_ptr<ReadIndirectBufferFromWebServer> createReadBuffer(const String & path) override
{
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, max_read_tries, buf_size);
}
private:
String uri;
ContextPtr context;
size_t max_read_tries;
size_t buf_size;
};
class WriteBufferFromNothing : public WriteBufferFromFile
{
public:
WriteBufferFromNothing() : WriteBufferFromFile("/dev/null") {}
void sync() override {}
};
DiskWebServer::DiskWebServer(
const String & disk_name_,
const String & uri_,
const String & metadata_path_,
ContextPtr context_,
SettingsPtr settings_)
: WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("DiskWeb"))
, uri(uri_)
, name(disk_name_)
, metadata_path(metadata_path_)
, settings(std::move(settings_))
{
}
String DiskWebServer::getFileName(const String & path) const
{
String result;
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN)
&& RE2::Extract(path, MATCH_DIRECTORY_FILE_PATTERN, fmt::format(R"({}-\1-\2-\3)", settings->files_prefix), &result))
return result;
if (RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN)
&& RE2::Extract(path, MATCH_ROOT_FILE_PATTERN, fmt::format(R"({}-\1-\2)", settings->files_prefix), &result))
return result;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", path);
}
bool DiskWebServer::findFileInMetadata(const String & path, File & file_info) const
{
String table_uuid, directory_name, file_name;
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN, &table_uuid, &directory_name, &file_name)
|| RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN, &table_uuid, &file_name))
{
if (directory_name.empty())
directory_name = file_name;
if (!metadata.tables_data.count(table_uuid))
return false;
if (!metadata.tables_data[table_uuid].count(directory_name))
return false;
const auto & files = metadata.tables_data[table_uuid][directory_name];
auto file = files.find(File(file_name));
if (file == files.end())
return false;
file_info = *file;
return true;
}
return false;
}
bool DiskWebServer::exists(const String & path) const
{
LOG_DEBUG(log, "Checking existence of file: {}", path);
File file;
return findFileInMetadata(path, file);
}
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
LOG_DEBUG(log, "Read from file by path: {}", path);
File file;
if (!findFileInMetadata(path, file))
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
RemoteMetadata meta(uri, fs::path(path).parent_path() / fs::path(path).filename());
meta.remote_fs_objects.emplace_back(std::make_pair(getFileName(path), file.size));
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskWebServer::writeFile(const String & path, size_t, WriteMode)
{
if (path.ends_with("format_version.txt"))
return std::make_unique<WriteBufferFromNothing>();
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
{
LOG_DEBUG(log, "Iterate directory: {}", path);
String uuid;
if (RE2::FullMatch(path, ".*/store/"))
return std::make_unique<DiskWebDirectoryIterator<RootDirectory>>(metadata.tables_data, path);
if (!RE2::Extract(path, EXTRACT_UUID_PATTERN, "\\1", &uuid))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot extract uuid for: {}", path);
/// Do not throw if it is not a query, but disk load.
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
try
{
if (!metadata.tables_data.count(uuid))
metadata.initialize(uri, settings->files_prefix, uuid, getContext());
}
catch (const Poco::Exception &)
{
const auto message = getCurrentExceptionMessage(false);
if (can_throw)
{
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
}
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Cannot load disk metadata. Error: {}", message);
/// Empty iterator.
return std::make_unique<DiskWebDirectoryIterator<Directory>>(metadata.tables_data[""], path);
}
return std::make_unique<DiskWebDirectoryIterator<Directory>>(metadata.tables_data[uuid], path);
}
size_t DiskWebServer::getFileSize(const String & path) const
{
File file;
if (!findFileInMetadata(path, file))
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
return file.size;
}
bool DiskWebServer::isFile(const String & path) const
{
return RE2::FullMatch(path, ".*/\\w+.\\w+");
}
bool DiskWebServer::isDirectory(const String & path) const
{
return RE2::FullMatch(path, ".*/\\w+");
}
void registerDiskWebServer(DiskFactory & factory)
{
auto creator = [](const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri{config.getString(config_prefix + ".endpoint")};
if (!uri.ends_with('/'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "URI must end with '/', but '{}' doesn't.", uri);
auto settings = std::make_unique<DiskWebServerSettings>(
context->getGlobalContext()->getSettingsRef().http_max_single_read_retries,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getString(config_prefix + ".files_prefix", disk_name));
String metadata_path = fs::path(context->getPath()) / "disks" / disk_name / "";
return std::make_shared<DiskWebServer>(disk_name, uri, metadata_path, context, std::move(settings));
};
factory.registerDiskType("web", creator);
}
}

234
src/Disks/DiskWebServer.h Normal file
View File

@ -0,0 +1,234 @@
#pragma once
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h>
#include <Core/UUID.h>
#include <set>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
struct DiskWebServerSettings
{
/// Number of read attempts before throw that network is unreachable.
size_t max_read_tries;
/// Passed to SeekAvoidingReadBuffer.
size_t min_bytes_for_seek;
String files_prefix;
DiskWebServerSettings(size_t max_read_tries_, size_t min_bytes_for_seek_, String files_prefix_)
: max_read_tries(max_read_tries_) , min_bytes_for_seek(min_bytes_for_seek_), files_prefix(files_prefix_) {}
};
/*
* Quick ready test - you can try this disk, by using these queries (disk has two tables) and this endpoint:
*
* ATTACH TABLE contributors UUID 'a563f7d8-fb00-4d50-a563-f7d8fb007d50' (good_person_name String) engine=MergeTree() order by good_person_name settings storage_policy='web';
* ATTACH TABLE test UUID '11c7a2f9-a949-4c88-91c7-a2f9a949ec88' (a Int32) engine=MergeTree() order by a settings storage_policy='web';
*
* <storage_configuration>
* <disks>
* <web>
* <type>web</type>
* <endpoint>https://clickhouse-datasets.s3.yandex.net/kssenii-static-files-disk-test/kssenii-disk-tests/test1/</endpoint>
* <files_prefix>data</files_prefix>
* </web>
* </disks>
* <policies>
* <web>
* <volumes>
* <main>
* <disk>web</disk>
* </main>
* </volumes>
* </web>
* </policies>
* </storage_configuration>
*
* To get files for upload run:
* clickhouse static-files-disk-uploader --metadata-path <path> --output-dir <dir> --files-prefix data
* (--metadata-path can be found in query: `select data_paths from system.tables where name='<table_name>';`)
*
* If url is not reachable on disk load when server is starting up tables, then all errors are caught.
* If in this case there were errors, tables can be reloaded (become visible) via detach table table_name -> attach table table_name.
* If metadata was successfully loaded at server startup, then tables are available straight away.
**/
class DiskWebServer : public IDisk, WithContext
{
using SettingsPtr = std::unique_ptr<DiskWebServerSettings>;
public:
DiskWebServer(const String & disk_name_,
const String & files_root_path_uri_,
const String & metadata_path_,
ContextPtr context,
SettingsPtr settings_);
struct File
{
String name;
size_t size;
File(const String & name_ = "", const size_t size_ = 0) : name(name_), size(size_) {}
bool operator<(const File & other) const { return name < other.name; }
bool operator==(const File & other) const { return name == other.name; }
};
using Directory = std::set<File>;
/* Each root directory contains either directories like
* all_x_x_x/{file}, detached/, etc, or root files like format_version.txt.
*/
using RootDirectory = std::unordered_map<String, Directory>;
/* Each table is attached via ATTACH TABLE table UUID <uuid> <def>.
* Then there is a mapping: {table uuid} -> {root directory}
*/
using TableDirectories = std::unordered_map<String, RootDirectory>;
struct Metadata
{
/// Fetch meta only when required.
mutable TableDirectories tables_data;
Metadata() {}
void initialize(const String & uri_with_path, const String & files_prefix, const String & uuid, ContextPtr context) const;
};
bool findFileInMetadata(const String & path, File & file_info) const;
bool supportZeroCopyReplication() const override { return false; }
String getFileName(const String & path) const;
DiskType getType() const override { return DiskType::WebServer; }
bool isRemote() const override { return true; }
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path,
const ReadSettings & settings,
size_t estimated_size) const override;
/// Disk info
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_path; }
bool isReadOnly() const override { return true; }
UInt64 getTotalSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const final override { return std::numeric_limits<UInt64>::max(); }
/// Read-only part
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
size_t getFileSize(const String & path) const override;
void listFiles(const String & /* path */, std::vector<String> & /* file_names */) override { }
void setReadOnly(const String & /* path */) override {}
bool isDirectory(const String & path) const override;
DiskDirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; }
/// Write and modification part
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String &, size_t, WriteMode) override;
void moveFile(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void replaceFile(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeFile(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeFileIfExists(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
ReservationPtr reserve(UInt64 /*bytes*/) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeRecursive(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeSharedFile(const String &, bool) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeSharedRecursive(const String &, bool) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void clearDirectory(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void moveDirectory(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeDirectory(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void setLastModified(const String &, const Poco::Timestamp &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
/// Create part
void createFile(const String &) final override {}
void createDirectory(const String &) override {}
void createDirectories(const String &) override {}
void createHardLink(const String &, const String &) override {}
private:
Poco::Logger * log;
String uri, name;
const String metadata_path;
SettingsPtr settings;
Metadata metadata;
};
}

View File

@ -226,6 +226,8 @@ public:
/// Overrode in remote fs disks. /// Overrode in remote fs disks.
virtual bool supportZeroCopyReplication() const = 0; virtual bool supportZeroCopyReplication() const = 0;
virtual bool isReadOnly() const { return false; }
/// Invoked when Global Context is shutdown. /// Invoked when Global Context is shutdown.
virtual void shutdown() {} virtual void shutdown() {}

View File

@ -33,10 +33,9 @@ IDiskRemote::Metadata::Metadata(
const String & disk_path_, const String & disk_path_,
const String & metadata_file_path_, const String & metadata_file_path_,
bool create) bool create)
: remote_fs_root_path(remote_fs_root_path_) : RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, disk_path(disk_path_) , disk_path(disk_path_)
, metadata_file_path(metadata_file_path_) , total_size(0), ref_count(0)
, total_size(0), remote_fs_objects(0), ref_count(0)
{ {
if (create) if (create)
return; return;
@ -72,10 +71,9 @@ IDiskRemote::Metadata::Metadata(
readEscapedString(remote_fs_object_path, buf); readEscapedString(remote_fs_object_path, buf);
if (version == VERSION_ABSOLUTE_PATHS) if (version == VERSION_ABSOLUTE_PATHS)
{ {
if (!boost::algorithm::starts_with(remote_fs_object_path, remote_fs_root_path)) if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception( throw Exception(ErrorCodes::UNKNOWN_FORMAT,
ErrorCodes::UNKNOWN_FORMAT, "Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
"Path in metadata does not correspond S3 root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, disk_path_); remote_fs_object_path, remote_fs_root_path, disk_path_);
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size()); remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());

View File

@ -148,34 +148,42 @@ private:
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>; using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
/// Remote FS (S3, HDFS) metadata file layout:
/// Number of FS objects, Total size of all FS objects.
/// Each FS object represents path where object located in FS and size of object.
struct IDiskRemote::Metadata /// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS<T>
struct RemoteMetadata
{
using PathAndSize = std::pair<String, size_t>;
/// Remote FS objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// URI
const String & remote_fs_root_path;
/// Relative path to metadata file on local FS.
const String metadata_file_path;
RemoteMetadata(const String & remote_fs_root_path_, const String & metadata_file_path_)
: remote_fs_root_path(remote_fs_root_path_), metadata_file_path(metadata_file_path_) {}
};
/// Remote FS (S3, HDFS) metadata file layout:
/// FS objects, their number and total size of all FS objects.
/// Each FS object represents a file path in remote FS and its size.
struct IDiskRemote::Metadata : RemoteMetadata
{ {
/// Metadata file version. /// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1; static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2; static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3; static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
using PathAndSize = std::pair<String, size_t>;
/// Remote FS (S3, HDFS) root path.
const String & remote_fs_root_path;
/// Disk path. /// Disk path.
const String & disk_path; const String & disk_path;
/// Relative path to metadata file on local FS.
String metadata_file_path;
/// Total size of all remote FS (S3, HDFS) objects. /// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0; size_t total_size = 0;
/// Remote FS (S3, HDFS) objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// Number of references (hardlinks) to this metadata file. /// Number of references (hardlinks) to this metadata file.
UInt32 ref_count = 0; UInt32 ref_count = 0;

View File

@ -1,8 +1,8 @@
#include "ReadIndirectBufferFromRemoteFS.h" #include "ReadIndirectBufferFromRemoteFS.h"
#if USE_AWS_S3 || USE_HDFS
#include <IO/ReadBufferFromS3.h> #include <IO/ReadBufferFromS3.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h> #include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/ReadIndirectBufferFromWebServer.h>
namespace DB namespace DB
@ -16,7 +16,7 @@ namespace ErrorCodes
template<typename T> template<typename T>
ReadIndirectBufferFromRemoteFS<T>::ReadIndirectBufferFromRemoteFS( ReadIndirectBufferFromRemoteFS<T>::ReadIndirectBufferFromRemoteFS(
IDiskRemote::Metadata metadata_) RemoteMetadata metadata_)
: metadata(std::move(metadata_)) : metadata(std::move(metadata_))
{ {
} }
@ -137,6 +137,7 @@ template
class ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>; class ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>;
#endif #endif
} template
class ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>;
#endif }

View File

@ -4,8 +4,6 @@
#include <Common/config.h> #include <Common/config.h>
#endif #endif
#if USE_AWS_S3 || USE_HDFS
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Disks/IDiskRemote.h> #include <Disks/IDiskRemote.h>
#include <utility> #include <utility>
@ -19,7 +17,7 @@ template <typename T>
class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{ {
public: public:
explicit ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_); explicit ReadIndirectBufferFromRemoteFS(RemoteMetadata metadata_);
off_t seek(off_t offset_, int whence) override; off_t seek(off_t offset_, int whence) override;
@ -30,7 +28,7 @@ public:
virtual std::unique_ptr<T> createReadBuffer(const String & path) = 0; virtual std::unique_ptr<T> createReadBuffer(const String & path) = 0;
protected: protected:
IDiskRemote::Metadata metadata; RemoteMetadata metadata;
private: private:
std::unique_ptr<T> initialize(); std::unique_ptr<T> initialize();
@ -47,5 +45,3 @@ private:
}; };
} }
#endif

View File

@ -0,0 +1,132 @@
#include "ReadIndirectBufferFromWebServer.h"
#include <common/logger_useful.h>
#include <Core/Types.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <thread>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int NETWORK_ERROR;
}
ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer(const String & url_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
, log(&Poco::Logger::get("ReadIndirectBufferFromWebServer"))
, context(context_)
, url(url_)
, buf_size(buf_size_)
, max_read_tries(max_read_tries_)
{
}
std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
{
Poco::URI uri(url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
LOG_DEBUG(log, "Reading from offset: {}", offset);
return std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(context),
0,
Poco::Net::HTTPBasicCredentials{},
buf_size,
headers);
}
bool ReadIndirectBufferFromWebServer::nextImpl()
{
bool next_result = false, successful_read = false;
if (impl)
{
/// Restore correct position at the needed offset.
impl->position() = position();
assert(!impl->hasPendingData());
}
else
{
try
{
impl = initialize();
}
catch (const Poco::Exception & e)
{
throw Exception(ErrorCodes::NETWORK_ERROR, "Unreachable url: {}. Error: {}", url, e.what());
}
next_result = impl->hasPendingData();
}
for (size_t try_num = 0; (try_num < max_read_tries) && !next_result; ++try_num)
{
try
{
next_result = impl->next();
successful_read = true;
break;
}
catch (const Exception & e)
{
LOG_WARNING(log, "Read attempt {}/{} failed from {}. ({})", try_num, max_read_tries, url, e.message());
impl.reset();
impl = initialize();
next_result = impl->hasPendingData();
}
}
if (!successful_read)
throw Exception(ErrorCodes::NETWORK_ERROR, "All read attempts ({}) failed for uri: {}", max_read_tries, url);
if (next_result)
{
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
offset += working_buffer.size();
}
return next_result;
}
off_t ReadIndirectBufferFromWebServer::seek(off_t offset_, int whence)
{
if (impl)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer");
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed");
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", std::to_string(offset_));
offset = offset_;
return offset;
}
off_t ReadIndirectBufferFromWebServer::getPosition()
{
return offset - available();
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <IO/SeekableReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Interpreters/Context.h>
namespace DB
{
/* Read buffer, which reads via http, but is used as ReadBufferFromFileBase.
* Used to read files, hosted on a web server with static files.
*
* Usage: ReadIndirectBufferFromRemoteFS -> SeekAvoidingReadBuffer -> ReadIndirectBufferFromWebServer -> ReadWriteBufferFromHTTP.
*/
class ReadIndirectBufferFromWebServer : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
explicit ReadIndirectBufferFromWebServer(const String & url_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
private:
std::unique_ptr<ReadBuffer> initialize();
Poco::Logger * log;
ContextPtr context;
const String url;
size_t buf_size, max_read_tries;
std::unique_ptr<ReadBuffer> impl;
off_t offset = 0;
};
}

View File

@ -146,7 +146,7 @@ public:
std::unique_ptr<ReadBufferFromS3> createReadBuffer(const String & path) override std::unique_ptr<ReadBufferFromS3> createReadBuffer(const String & path) override
{ {
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.remote_fs_root_path + path, max_single_read_retries, buf_size); return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, buf_size);
} }
private: private:

View File

@ -1,8 +1,8 @@
#include "WriteIndirectBufferFromRemoteFS.h" #include "WriteIndirectBufferFromRemoteFS.h"
#if USE_AWS_S3 || USE_HDFS
#include <IO/WriteBufferFromS3.h> #include <IO/WriteBufferFromS3.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h> #include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteBufferFromHTTP.h>
namespace DB namespace DB
@ -65,6 +65,7 @@ template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>; class WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>;
#endif #endif
} template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHTTP>;
#endif }

View File

@ -4,8 +4,6 @@
#include <Common/config.h> #include <Common/config.h>
#endif #endif
#if USE_AWS_S3 || USE_HDFS
#include <Disks/IDiskRemote.h> #include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h> #include <IO/WriteBufferFromFileDecorator.h>
@ -38,5 +36,3 @@ private:
}; };
} }
#endif

View File

@ -24,6 +24,8 @@ void registerDiskEncrypted(DiskFactory & factory);
void registerDiskHDFS(DiskFactory & factory); void registerDiskHDFS(DiskFactory & factory);
#endif #endif
void registerDiskWebServer(DiskFactory & factory);
void registerDisks() void registerDisks()
{ {
@ -43,6 +45,8 @@ void registerDisks()
#if USE_HDFS #if USE_HDFS
registerDiskHDFS(factory); registerDiskHDFS(factory);
#endif #endif
registerDiskWebServer(factory);
} }
} }

View File

@ -16,6 +16,7 @@ SRCS(
DiskMemory.cpp DiskMemory.cpp
DiskRestartProxy.cpp DiskRestartProxy.cpp
DiskSelector.cpp DiskSelector.cpp
DiskWebServer.cpp
IDisk.cpp IDisk.cpp
IDiskRemote.cpp IDiskRemote.cpp
IVolume.cpp IVolume.cpp
@ -23,6 +24,7 @@ SRCS(
ReadIndirectBufferFromRemoteFS.cpp ReadIndirectBufferFromRemoteFS.cpp
SingleDiskVolume.cpp SingleDiskVolume.cpp
StoragePolicy.cpp StoragePolicy.cpp
TemporaryFileOnDisk.cpp
VolumeJBOD.cpp VolumeJBOD.cpp
VolumeRAID1.cpp VolumeRAID1.cpp
WriteIndirectBufferFromRemoteFS.cpp WriteIndirectBufferFromRemoteFS.cpp

View File

@ -7,7 +7,7 @@ PEERDIR(
) )
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -161,14 +161,12 @@ InputFormatPtr FormatFactory::getInput(
if (settings.max_memory_usage_for_user && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage_for_user) if (settings.max_memory_usage_for_user && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage_for_user)
parallel_parsing = false; parallel_parsing = false;
if (parallel_parsing && name == "JSONEachRow") if (parallel_parsing)
{ {
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix() const auto & non_trivial_prefix_and_suffix_checker = getCreators(name).non_trivial_prefix_and_suffix_checker;
/// Disable parallel parsing for input formats with non-trivial readPrefix() and readSuffix().
/// For JSONEachRow we can safely skip whitespace characters if (non_trivial_prefix_and_suffix_checker && non_trivial_prefix_and_suffix_checker(buf))
skipWhitespaceIfAny(buf); parallel_parsing = false;
if (buf.eof() || *buf.position() == '[')
parallel_parsing = false; /// Disable it for JSONEachRow if data is in square brackets (see JSONEachRowRowInputFormat)
} }
if (parallel_parsing) if (parallel_parsing)
@ -392,6 +390,14 @@ void FormatFactory::registerInputFormatProcessor(const String & name, InputProce
target = std::move(input_creator); target = std::move(input_creator);
} }
void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker)
{
auto & target = dict[name].non_trivial_prefix_and_suffix_checker;
if (target)
throw Exception("FormatFactory: Non trivial prefix and suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(non_trivial_prefix_and_suffix_checker);
}
void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator) void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator)
{ {
auto & target = dict[name].output_processor_creator; auto & target = dict[name].output_processor_creator;

View File

@ -93,6 +93,11 @@ private:
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
const FormatSettings & settings)>; const FormatSettings & settings)>;
/// Some input formats can have non trivial readPrefix() and readSuffix(),
/// so in some cases there is no possibility to use parallel parsing.
/// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
struct Creators struct Creators
{ {
InputCreator input_creator; InputCreator input_creator;
@ -102,6 +107,7 @@ private:
FileSegmentationEngine file_segmentation_engine; FileSegmentationEngine file_segmentation_engine;
bool supports_parallel_formatting{false}; bool supports_parallel_formatting{false};
bool is_column_oriented{false}; bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
}; };
using FormatsDictionary = std::unordered_map<String, Creators>; using FormatsDictionary = std::unordered_map<String, Creators>;
@ -166,6 +172,8 @@ public:
void registerOutputFormat(const String & name, OutputCreator output_creator); void registerOutputFormat(const String & name, OutputCreator output_creator);
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine); void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator); void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator); void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);

View File

@ -87,4 +87,11 @@ std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, D
return {loadAtPosition(in, memory, pos), number_of_rows}; return {loadAtPosition(in, memory, pos), number_of_rows};
} }
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf)
{
/// For JSONEachRow we can safely skip whitespace characters
skipWhitespaceIfAny(buf);
return buf.eof() || *buf.position() == '[';
}
} }

View File

@ -5,4 +5,6 @@ namespace DB
std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size); std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf);
} }

View File

@ -79,6 +79,9 @@ void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory); void registerInputFormatProcessorLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory); void registerInputFormatProcessorCapnProto(FormatFactory & factory);
/// Non trivial prefix and suffix checkers for disabling parallel parsing.
void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory);
void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory);
void registerFormats() void registerFormats()
{ {
@ -153,6 +156,9 @@ void registerFormats()
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(factory); registerInputFormatProcessorCapnProto(factory);
#endif #endif
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);
} }
} }

View File

@ -10,7 +10,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -290,6 +290,7 @@ SRCS(
geohashesInBox.cpp geohashesInBox.cpp
getMacro.cpp getMacro.cpp
getScalar.cpp getScalar.cpp
getServerPort.cpp
getSetting.cpp getSetting.cpp
getSizeOfEnumType.cpp getSizeOfEnumType.cpp
globalVariable.cpp globalVariable.cpp
@ -444,6 +445,7 @@ SRCS(
registerFunctionsNull.cpp registerFunctionsNull.cpp
registerFunctionsRandom.cpp registerFunctionsRandom.cpp
registerFunctionsReinterpret.cpp registerFunctionsReinterpret.cpp
registerFunctionsSnowflake.cpp
registerFunctionsString.cpp registerFunctionsString.cpp
registerFunctionsStringRegexp.cpp registerFunctionsStringRegexp.cpp
registerFunctionsStringSearch.cpp registerFunctionsStringSearch.cpp
@ -477,12 +479,14 @@ SRCS(
s2RectIntersection.cpp s2RectIntersection.cpp
s2RectUnion.cpp s2RectUnion.cpp
s2ToGeo.cpp s2ToGeo.cpp
serverUUID.cpp
sigmoid.cpp sigmoid.cpp
sign.cpp sign.cpp
sin.cpp sin.cpp
sinh.cpp sinh.cpp
sleep.cpp sleep.cpp
sleepEachRow.cpp sleepEachRow.cpp
snowflake.cpp
sqrt.cpp sqrt.cpp
startsWith.cpp startsWith.cpp
stem.cpp stem.cpp

View File

@ -315,6 +315,7 @@ void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPR
if (!(status == Poco::Net::HTTPResponse::HTTP_OK if (!(status == Poco::Net::HTTPResponse::HTTP_OK
|| status == Poco::Net::HTTPResponse::HTTP_CREATED || status == Poco::Net::HTTPResponse::HTTP_CREATED
|| status == Poco::Net::HTTPResponse::HTTP_ACCEPTED || status == Poco::Net::HTTPResponse::HTTP_ACCEPTED
|| status == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT /// Reading with Range header was successful.
|| (isRedirect(status) && allow_redirects))) || (isRedirect(status) && allow_redirects)))
{ {
std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM

View File

@ -56,6 +56,7 @@ SRCS(
ReadBufferFromMemory.cpp ReadBufferFromMemory.cpp
ReadBufferFromPocoSocket.cpp ReadBufferFromPocoSocket.cpp
ReadHelpers.cpp ReadHelpers.cpp
ReadIndirectBufferFromWebServer.cpp
SeekAvoidingReadBuffer.cpp SeekAvoidingReadBuffer.cpp
TimeoutSetter.cpp TimeoutSetter.cpp
UseSSL.cpp UseSSL.cpp

View File

@ -18,7 +18,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -157,6 +157,15 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews. /// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition. /// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startup(); TemporaryLiveViewCleaner::instance().startup();
/// Start up tables after all databases are loaded.
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
database->startupTables();
}
} }
void DatabaseCatalog::shutdownImpl() void DatabaseCatalog::shutdownImpl()

View File

@ -32,6 +32,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY; extern const int INCORRECT_QUERY;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int TABLE_IS_READ_ONLY;
} }
@ -62,6 +63,8 @@ BlockIO InterpreterAlterQuery::execute()
} }
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext()); StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isReadOnly())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
auto alter_lock = table->lockForAlter(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout); auto alter_lock = table->lockForAlter(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr(); auto metadata_snapshot = table->getInMemoryMetadataPtr();

View File

@ -272,7 +272,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
} }
/// We use global context here, because storages lifetime is bigger than query context lifetime /// We use global context here, because storages lifetime is bigger than query context lifetime
database->loadStoredObjects(getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach); //-V560 database->loadStoredObjects(
getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach, skip_startup_tables); //-V560
} }
catch (...) catch (...)
{ {

View File

@ -52,6 +52,11 @@ public:
force_attach = force_attach_; force_attach = force_attach_;
} }
void setSkipStartupTables(bool skip_startup_tables_)
{
skip_startup_tables = skip_startup_tables_;
}
/// Obtain information about columns, their types, default values and column comments, /// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly. /// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach); static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach);
@ -94,6 +99,7 @@ private:
/// Is this an internal query - not from the user. /// Is this an internal query - not from the user.
bool internal = false; bool internal = false;
bool force_attach = false; bool force_attach = false;
bool skip_startup_tables = false;
mutable String as_database_saved; mutable String as_database_saved;
mutable String as_table_saved; mutable String as_table_saved;

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE; extern const int UNKNOWN_TABLE;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY; extern const int INCORRECT_QUERY;
extern const int TABLE_IS_READ_ONLY;
} }
@ -162,6 +163,8 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
if (query.kind == ASTDropQuery::Kind::Detach) if (query.kind == ASTDropQuery::Kind::Detach)
{ {
getContext()->checkAccess(drop_storage, table_id); getContext()->checkAccess(drop_storage, table_id);
if (table->isReadOnly())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
if (table->isDictionary()) if (table->isDictionary())
{ {
@ -195,6 +198,8 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR); throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR);
getContext()->checkAccess(AccessType::TRUNCATE, table_id); getContext()->checkAccess(AccessType::TRUNCATE, table_id);
if (table->isReadOnly())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
table->checkTableCanBeDropped(); table->checkTableCanBeDropped();

View File

@ -288,7 +288,7 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
#if defined(ARCADIA_BUILD) #if defined(ARCADIA_BUILD)
/// This is harmful field that is used only in foreign "Arcadia" build. /// This is harmful field that is used only in foreign "Arcadia" build.
if (const auto * basic_credentials = dynamic_cast<const BasicCredentials *>(&credentials_)) if (const auto * basic_credentials = dynamic_cast<const BasicCredentials *>(&credentials_))
session_client_info->current_password = basic_credentials->getPassword(); prepared_client_info->current_password = basic_credentials->getPassword();
#endif #endif
} }

View File

@ -43,6 +43,7 @@ static void executeCreateQuery(
interpreter.setInternal(true); interpreter.setInternal(true);
interpreter.setForceAttach(true); interpreter.setForceAttach(true);
interpreter.setForceRestoreData(has_force_restore_data_flag); interpreter.setForceRestoreData(has_force_restore_data_flag);
interpreter.setSkipStartupTables(true);
interpreter.execute(); interpreter.execute();
} }

View File

@ -70,6 +70,7 @@ SRCS(
InJoinSubqueriesPreprocessor.cpp InJoinSubqueriesPreprocessor.cpp
InternalTextLogsQueue.cpp InternalTextLogsQueue.cpp
InterpreterAlterQuery.cpp InterpreterAlterQuery.cpp
InterpreterBackupQuery.cpp
InterpreterCheckQuery.cpp InterpreterCheckQuery.cpp
InterpreterCreateFunctionQuery.cpp InterpreterCreateFunctionQuery.cpp
InterpreterCreateQuery.cpp InterpreterCreateQuery.cpp
@ -146,6 +147,7 @@ SRCS(
RewriteSumIfFunctionVisitor.cpp RewriteSumIfFunctionVisitor.cpp
RowRefs.cpp RowRefs.cpp
SelectIntersectExceptQueryVisitor.cpp SelectIntersectExceptQueryVisitor.cpp
Session.cpp
Set.cpp Set.cpp
SetVariants.cpp SetVariants.cpp
SortedBlocksWriter.cpp SortedBlocksWriter.cpp

View File

@ -11,6 +11,7 @@ PEERDIR(
SRCS( SRCS(
ASTAlterQuery.cpp ASTAlterQuery.cpp
ASTAsterisk.cpp ASTAsterisk.cpp
ASTBackupQuery.cpp
ASTColumnDeclaration.cpp ASTColumnDeclaration.cpp
ASTColumnsMatcher.cpp ASTColumnsMatcher.cpp
ASTColumnsTransformers.cpp ASTColumnsTransformers.cpp
@ -51,6 +52,7 @@ SRCS(
ASTRolesOrUsersSet.cpp ASTRolesOrUsersSet.cpp
ASTRowPolicyName.cpp ASTRowPolicyName.cpp
ASTSampleRatio.cpp ASTSampleRatio.cpp
ASTSelectIntersectExceptQuery.cpp
ASTSelectQuery.cpp ASTSelectQuery.cpp
ASTSelectWithUnionQuery.cpp ASTSelectWithUnionQuery.cpp
ASTSetQuery.cpp ASTSetQuery.cpp
@ -89,6 +91,7 @@ SRCS(
MySQL/ASTDeclareSubPartition.cpp MySQL/ASTDeclareSubPartition.cpp
MySQL/ASTDeclareTableOptions.cpp MySQL/ASTDeclareTableOptions.cpp
ParserAlterQuery.cpp ParserAlterQuery.cpp
ParserBackupQuery.cpp
ParserCase.cpp ParserCase.cpp
ParserCheckQuery.cpp ParserCheckQuery.cpp
ParserCreateFunctionQuery.cpp ParserCreateFunctionQuery.cpp
@ -142,6 +145,7 @@ SRCS(
TokenIterator.cpp TokenIterator.cpp
formatAST.cpp formatAST.cpp
formatSettingName.cpp formatSettingName.cpp
getInsertQuery.cpp
iostream_debug_helpers.cpp iostream_debug_helpers.cpp
makeASTForLogicalFunction.cpp makeASTForLogicalFunction.cpp
obfuscateQueries.cpp obfuscateQueries.cpp

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F New | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F New | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -34,6 +34,35 @@ void JSONAsStringRowInputFormat::resetParser()
buf.reset(); buf.reset();
} }
void JSONAsStringRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == '[')
{
++buf.position();
data_in_square_brackets = true;
}
}
void JSONAsStringRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(buf);
if (data_in_square_brackets)
{
assertChar(']', buf);
skipWhitespaceIfAny(buf);
}
if (!buf.eof() && *buf.position() == ';')
{
++buf.position();
skipWhitespaceIfAny(buf);
}
assertEOF(buf);
}
void JSONAsStringRowInputFormat::readJSONObject(IColumn & column) void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
{ {
PeekableReadBufferCheckpoint checkpoint{buf}; PeekableReadBufferCheckpoint checkpoint{buf};
@ -113,7 +142,23 @@ void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{ {
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(buf); skipWhitespaceIfAny(buf);
if (!buf.eof())
{
if (!data_in_square_brackets && *buf.position() == ';')
{
/// ';' means the end of query, but it cannot be before ']'.
return allow_new_rows = false;
}
else if (data_in_square_brackets && *buf.position() == ']')
{
/// ']' means the end of query.
return allow_new_rows = false;
}
}
if (!buf.eof()) if (!buf.eof())
readJSONObject(*columns[0]); readJSONObject(*columns[0]);
@ -143,4 +188,9 @@ void registerFileSegmentationEngineJSONAsString(FormatFactory & factory)
factory.registerFileSegmentationEngine("JSONAsString", &fileSegmentationEngineJSONEachRowImpl); factory.registerFileSegmentationEngine("JSONAsString", &fileSegmentationEngineJSONEachRowImpl);
} }
void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory)
{
factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsString", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
} }

View File

@ -22,10 +22,17 @@ public:
String getName() const override { return "JSONAsStringRowInputFormat"; } String getName() const override { return "JSONAsStringRowInputFormat"; }
void resetParser() override; void resetParser() override;
void readPrefix() override;
void readSuffix() override;
private: private:
void readJSONObject(IColumn & column); void readJSONObject(IColumn & column);
PeekableReadBuffer buf; PeekableReadBuffer buf;
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;
bool allow_new_rows = true;
}; };
} }

View File

@ -359,4 +359,10 @@ void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
factory.registerFileSegmentationEngine("JSONStringsEachRow", &fileSegmentationEngineJSONEachRowImpl); factory.registerFileSegmentationEngine("JSONStringsEachRow", &fileSegmentationEngineJSONEachRowImpl);
} }
void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory)
{
factory.registerNonTrivialPrefixAndSuffixChecker("JSONEachRow", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
factory.registerNonTrivialPrefixAndSuffixChecker("JSONStringsEachRow", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
} }

View File

@ -118,6 +118,7 @@ SRCS(
QueryPlan/IQueryPlanStep.cpp QueryPlan/IQueryPlanStep.cpp
QueryPlan/ISourceStep.cpp QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp QueryPlan/ITransformingStep.cpp
QueryPlan/IntersectOrExceptStep.cpp
QueryPlan/JoinStep.cpp QueryPlan/JoinStep.cpp
QueryPlan/LimitByStep.cpp QueryPlan/LimitByStep.cpp
QueryPlan/LimitStep.cpp QueryPlan/LimitStep.cpp
@ -165,6 +166,7 @@ SRCS(
Transforms/FillingTransform.cpp Transforms/FillingTransform.cpp
Transforms/FilterTransform.cpp Transforms/FilterTransform.cpp
Transforms/FinishSortingTransform.cpp Transforms/FinishSortingTransform.cpp
Transforms/IntersectOrExceptTransform.cpp
Transforms/JoiningTransform.cpp Transforms/JoiningTransform.cpp
Transforms/LimitByTransform.cpp Transforms/LimitByTransform.cpp
Transforms/LimitsCheckingTransform.cpp Transforms/LimitsCheckingTransform.cpp

View File

@ -16,7 +16,7 @@ ADDINCL(
CFLAGS(-DUSE_ARROW=1) CFLAGS(-DUSE_ARROW=1)
SRCS( SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -P 'Avro|ORC|Parquet|CapnProto' | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'Avro|ORC|Parquet|CapnProto' | sed 's/^\.\// /' | sort ?>
) )
END() END()

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <Common/config.h> #if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_HDFS #if USE_HDFS
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <Common/config.h> #if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_HDFS #if USE_HDFS
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
@ -19,7 +21,7 @@ class WriteBufferFromHDFS final : public BufferWithOwnMemory<WriteBuffer>
public: public:
WriteBufferFromHDFS( WriteBufferFromHDFS(
const std::string & hdfs_name_, const String & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_, const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
int flags = O_WRONLY); int flags = O_WRONLY);

View File

@ -201,6 +201,19 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const
return name_deps; return name_deps;
} }
bool IStorage::isReadOnly() const
{
auto storage_policy = getStoragePolicy();
if (storage_policy)
{
for (const auto & disk : storage_policy->getDisks())
if (!disk->isReadOnly())
return false;
return true;
}
return false;
}
BackupEntries IStorage::backup(const ASTs &, ContextPtr) const BackupEntries IStorage::backup(const ASTs &, ContextPtr) const
{ {
throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED);

View File

@ -539,6 +539,9 @@ public:
/// Returns storage policy if storage supports it. /// Returns storage policy if storage supports it.
virtual StoragePolicyPtr getStoragePolicy() const { return {}; } virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
/// Returns true if all disks of storage are read-only.
virtual bool isReadOnly() const;
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it. /// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for: /// Used for:
/// - Simple count() optimization /// - Simple count() optimization

View File

@ -2593,6 +2593,8 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
LOG_TRACE(log, "Trying to immediately remove part {}", part->getNameWithState()); LOG_TRACE(log, "Trying to immediately remove part {}", part->getNameWithState());
if (part->getState() != DataPartState::Temporary)
{
auto it = data_parts_by_info.find(part->info); auto it = data_parts_by_info.find(part->info);
if (it == data_parts_by_info.end() || (*it).get() != part.get()) if (it == data_parts_by_info.end() || (*it).get() != part.get())
throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR); throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
@ -2603,8 +2605,14 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
return; return;
modifyPartState(it, DataPartState::Deleting); modifyPartState(it, DataPartState::Deleting);
part_to_delete = *it; part_to_delete = *it;
} }
else
{
part_to_delete = std::move(part);
}
}
try try
{ {

View File

@ -783,7 +783,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge); auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge);
auto tmp_disk = context->getTemporaryVolume()->getDisk(); auto tmp_disk = context->getTemporaryVolume()->getDisk();
String rows_sources_file_path; std::unique_ptr<TemporaryFile> rows_sources_file;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf; std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf; std::unique_ptr<WriteBuffer> rows_sources_write_buf;
std::optional<ColumnSizeEstimator> column_sizes; std::optional<ColumnSizeEstimator> column_sizes;
@ -792,9 +792,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (chosen_merge_algorithm == MergeAlgorithm::Vertical) if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{ {
tmp_disk->createDirectories(new_part_tmp_path); rows_sources_file = createTemporaryFile(tmp_disk->getPath());
rows_sources_file_path = new_part_tmp_path + "rows_sources"; rows_sources_uncompressed_write_buf = tmp_disk->writeFile(fileName(rows_sources_file->path()));
rows_sources_uncompressed_write_buf = tmp_disk->writeFile(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf); rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
MergeTreeData::DataPart::ColumnToSize merged_column_to_size; MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
@ -1030,7 +1029,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count) + ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR); + "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(rows_sources_file_path)); CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(fileName(rows_sources_file->path())));
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns; IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size(); for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
@ -1101,8 +1100,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed); merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed);
} }
tmp_disk->removeFile(rows_sources_file_path);
} }
for (const auto & part : parts) for (const auto & part : parts)

Some files were not shown because too many files have changed in this diff Show More