Merge branch 'ClickHouse:master' into master

This commit is contained in:
OnePiece 2021-08-31 10:39:49 +08:00 committed by GitHub
commit 79aaca5185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
131 changed files with 2077 additions and 244 deletions

View File

@ -98,6 +98,8 @@ RUN set -x \
&& echo 'dockremap:165536:65536' >> /etc/subuid \
&& echo 'dockremap:165536:65536' >> /etc/subgid
RUN echo '127.0.0.1 localhost test.com' >> /etc/hosts
EXPOSE 2375
ENTRYPOINT ["dockerd-entrypoint.sh"]
CMD ["sh", "-c", "pytest $PYTEST_OPTS"]

View File

@ -0,0 +1,11 @@
version: '2.3'
services:
# nginx server to host static files.
# Accepts only PUT data by test.com/path and GET already existing data on test.com/path.
# Files will be put into /usr/share/nginx/files.
nginx:
image: kssenii/nginx-test:1.1
restart: always
ports:
- 80:80

View File

@ -58,7 +58,7 @@ function start()
echo "Cannot start clickhouse-server"
cat /var/log/clickhouse-server/stdout.log
tail -n1000 /var/log/clickhouse-server/stderr.log
tail -n100000 /var/log/clickhouse-server/clickhouse-server.log | grep -F -v '<Warning> RaftInstance:' -e '<Information> RaftInstance' | tail -n1000
tail -n100000 /var/log/clickhouse-server/clickhouse-server.log | grep -F -v -e '<Warning> RaftInstance:' -e '<Information> RaftInstance' | tail -n1000
break
fi
# use root to match with current uid

View File

@ -184,9 +184,10 @@ Similar to GraphiteMergeTree, the HDFS engine supports extended configuration us
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
|libhdfs3\_conf | "" |
### Limitations {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path can be global only, not user specific
* hadoop\_security\_kerberos\_ticket\_cache\_path and libhdfs3\_conf can be global only, not user specific
## Kerberos support {#kerberos-support}
@ -198,6 +199,22 @@ security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_c
If hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal or hadoop\_kerberos\_kinit\_command is specified, kinit will be invoked. hadoop\_kerberos\_keytab and hadoop\_kerberos\_principal are mandatory in this case. kinit tool and krb5 configuration files are required.
## HDFS Namenode HA support{#namenode-ha}
libhdfs3 support HDFS namenode HA.
- Copy `hdfs-site.xml` from an HDFS node to `/etc/clickhouse-server/`.
- Add following piece to ClickHouse config file:
``` xml
<hdfs>
<libhdfs3_conf>/etc/clickhouse-server/hdfs-site.xml</libhdfs3_conf>
</hdfs>
```
- Then use `dfs.nameservices` tag value of `hdfs-site.xml` as the namenode address in the HDFS URI. For example, replace `hdfs://appadmin@192.168.101.11:8020/abc/` with `hdfs://appadmin@my_nameservice/abc/`.
## Virtual Columns {#virtual-columns}
- `_path` — Path to the file.

View File

@ -44,4 +44,10 @@ Restrictions:
- some data types are sent as strings
To cancel a long query use `KILL QUERY connection_id` statement (it is replaced with `KILL QUERY WHERE query_id = connection_id` while proceeding). For example:
``` bash
$ mysql --protocol tcp -h mysql_server -P 9004 default -u default --password=123 -e "KILL QUERY 123456;"
```
[Original article](https://clickhouse.tech/docs/en/interfaces/mysql/) <!--hide-->

View File

@ -2236,6 +2236,53 @@ defaultRoles()
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
## getServerPort {#getserverport}
Returns the number of the server port. When the port is not used by the server, throws an exception.
**Syntax**
``` sql
getServerPort(port_name)
```
**Arguments**
- `port_name` — The name of the server port. [String](../../sql-reference/data-types/string.md#string). Possible values:
- 'tcp_port'
- 'tcp_port_secure'
- 'http_port'
- 'https_port'
- 'interserver_http_port'
- 'interserver_https_port'
- 'mysql_port'
- 'postgresql_port'
- 'grpc_port'
- 'prometheus.port'
**Returned value**
- The number of the server port.
Type: [UInt16](../../sql-reference/data-types/int-uint.md).
**Example**
Query:
``` sql
SELECT getServerPort('tcp_port');
```
Result:
``` text
┌─getServerPort('tcp_port')─┐
│ 9000 │
└───────────────────────────┘
```
## queryID {#query-id}
Returns the ID of the current query. Other parameters of a query can be extracted from the [system.query_log](../../operations/system-tables/query_log.md) table via `query_id`.

View File

@ -43,3 +43,9 @@ mysql>
- не поддерживаются подготовленные запросы
- некоторые типы данных отправляются как строки
Чтобы прервать долго выполняемый запрос, используйте запрос `KILL QUERY connection_id` (во время выполнения он будет заменен на `KILL QUERY WHERE query_id = connection_id`). Например:
``` bash
$ mysql --protocol tcp -h mysql_server -P 9004 default -u default --password=123 -e "KILL QUERY 123456;"
```

View File

@ -2186,6 +2186,53 @@ defaultRoles()
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
## getServerPort {#getserverport}
Возвращает номер порта сервера. Если порт не используется сервером, генерируется исключение.
**Синтаксис**
``` sql
getServerPort(port_name)
```
**Аргументы**
- `port_name` — имя порта сервера. [String](../../sql-reference/data-types/string.md#string). Возможные значения:
- 'tcp_port'
- 'tcp_port_secure'
- 'http_port'
- 'https_port'
- 'interserver_http_port'
- 'interserver_https_port'
- 'mysql_port'
- 'postgresql_port'
- 'grpc_port'
- 'prometheus.port'
**Возвращаемое значение**
- Номер порта сервера.
Тип: [UInt16](../../sql-reference/data-types/int-uint.md).
**Пример**
Запрос:
``` sql
SELECT getServerPort('tcp_port');
```
Результат:
``` text
┌─getServerPort('tcp_port')─┐
│ 9000 │
└───────────────────────────┘
```
## queryID {#query-id}
Возвращает идентификатор текущего запроса. Другие параметры запроса могут быть извлечены из системной таблицы [system.query_log](../../operations/system-tables/query_log.md) через `query_id`.

View File

@ -5,7 +5,7 @@ machine_translated_rev: 5decc73b5dc60054f19087d3690c4eb99446a6c3
# IN 操作符 {#select-in-operators}
`IN`, `NOT IN`, `GLOBAL IN`,和 `GLOBAL NOT IN` 运算符是单独复盖的,因为它们的功能相当丰富。
`IN`, `NOT IN`, `GLOBAL IN`,和 `GLOBAL NOT IN` 运算符是单独考虑的,因为它们的功能相当丰富。
运算符的左侧是单列或元组。

View File

@ -45,9 +45,9 @@ option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Li
${ENABLE_CLICKHOUSE_ALL})
# https://presentations.clickhouse.tech/matemarketing_2020/
option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories"
${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER "A tool to export table data files to be later put to a static files web server" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_KEEPER "ClickHouse alternative to ZooKeeper" ${ENABLE_CLICKHOUSE_ALL})
@ -227,6 +227,7 @@ add_subdirectory (obfuscator)
add_subdirectory (install)
add_subdirectory (git-import)
add_subdirectory (bash-completion)
add_subdirectory (static-files-disk-uploader)
if (ENABLE_CLICKHOUSE_KEEPER)
add_subdirectory (keeper)
@ -258,7 +259,8 @@ if (CLICKHOUSE_ONE_SHARED)
${CLICKHOUSE_GIT_IMPORT_SOURCES}
${CLICKHOUSE_ODBC_BRIDGE_SOURCES}
${CLICKHOUSE_KEEPER_SOURCES}
${CLICKHOUSE_KEEPER_CONVERTER_SOURCES})
${CLICKHOUSE_KEEPER_CONVERTER_SOURCES}
${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES})
target_link_libraries(clickhouse-lib
${CLICKHOUSE_SERVER_LINK}
@ -273,7 +275,8 @@ if (CLICKHOUSE_ONE_SHARED)
${CLICKHOUSE_GIT_IMPORT_LINK}
${CLICKHOUSE_ODBC_BRIDGE_LINK}
${CLICKHOUSE_KEEPER_LINK}
${CLICKHOUSE_KEEPER_CONVERTER_LINK})
${CLICKHOUSE_KEEPER_CONVERTER_LINK}
${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK})
target_include_directories(clickhouse-lib
${CLICKHOUSE_SERVER_INCLUDE}
@ -306,6 +309,7 @@ if (CLICKHOUSE_SPLIT_BINARY)
clickhouse-obfuscator
clickhouse-git-import
clickhouse-copier
clickhouse-static-files-disk-uploader
)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
@ -371,6 +375,9 @@ else ()
if (ENABLE_CLICKHOUSE_GIT_IMPORT)
clickhouse_target_link_split_lib(clickhouse git-import)
endif ()
if (ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER)
clickhouse_target_link_split_lib(clickhouse static-files-disk-uploader)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER)
clickhouse_target_link_split_lib(clickhouse keeper)
endif()
@ -432,6 +439,11 @@ else ()
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-git-import" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-git-import)
endif ()
if (ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER)
add_custom_target (clickhouse-static-files-disk-uploader ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-static-files-disk-uploader DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-static-files-disk-uploader" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-static-files-disk-uploader)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER)
add_custom_target (clickhouse-keeper ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-keeper DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -18,3 +18,4 @@
#cmakedefine01 ENABLE_CLICKHOUSE_LIBRARY_BRIDGE
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER
#cmakedefine01 ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER

View File

@ -62,6 +62,9 @@ int mainEntryClickHouseKeeper(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_KEEPER
int mainEntryClickHouseKeeperConverter(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_INSTALL
int mainEntryClickHouseInstall(int argc, char ** argv);
int mainEntryClickHouseStart(int argc, char ** argv);
@ -131,6 +134,9 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"stop", mainEntryClickHouseStop},
{"status", mainEntryClickHouseStatus},
{"restart", mainEntryClickHouseRestart},
#endif
#if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER
{"static-files-disk-uploader", mainEntryClickHouseStaticFilesDiskUploader},
#endif
{"hash-binary", mainEntryClickHouseHashBinary},
};

View File

@ -0,0 +1,9 @@
set (CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES static-files-disk-uploader.cpp)
set (CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK
PRIVATE
boost::program_options
dbms
)
clickhouse_program_add(static-files-disk-uploader)

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseStaticFilesDiskUploader(argc_, argv_); }

View File

@ -0,0 +1,162 @@
#include <Common/Exception.h>
#include <Common/TerminalSize.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <IO/createReadBufferFromFileBase.h>
#include <boost/program_options.hpp>
#include <re2/re2.h>
#include <filesystem>
namespace fs = std::filesystem;
#define UUID_PATTERN "[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}"
#define EXTRACT_UUID_PATTERN fmt::format(".*/({})/.*", UUID_PATTERN)
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/*
* A tool to collect table data files on local fs as is (into current directory or into path from --output-dir option).
* If test-mode option is added, files will be put by given url via PUT request.
*/
void processTableFiles(const fs::path & path, const String & files_prefix, String uuid,
WriteBuffer & metadata_buf, std::function<std::shared_ptr<WriteBuffer>(const String &)> create_dst_buf)
{
fs::directory_iterator dir_end;
auto process_file = [&](const String & file_name, const String & file_path)
{
auto remote_file_name = files_prefix + "-" + uuid + "-" + file_name;
writeText(remote_file_name, metadata_buf);
writeChar('\t', metadata_buf);
writeIntText(fs::file_size(file_path), metadata_buf);
writeChar('\n', metadata_buf);
auto src_buf = createReadBufferFromFileBase(file_path, fs::file_size(file_path), 0, 0, nullptr);
auto dst_buf = create_dst_buf(remote_file_name);
copyData(*src_buf, *dst_buf);
dst_buf->next();
dst_buf->finalize();
};
for (fs::directory_iterator dir_it(path); dir_it != dir_end; ++dir_it)
{
if (dir_it->is_directory())
{
fs::directory_iterator files_end;
for (fs::directory_iterator file_it(dir_it->path()); file_it != files_end; ++file_it)
process_file(dir_it->path().filename().string() + "-" + file_it->path().filename().string(), file_it->path());
}
else
{
process_file(dir_it->path().filename(), dir_it->path());
}
}
}
}
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv)
try
{
using namespace DB;
namespace po = boost::program_options;
po::options_description description("Allowed options", getTerminalWidth());
description.add_options()
("help,h", "produce help message")
("metadata-path", po::value<std::string>(), "Metadata path (select data_paths from system.tables where name='table_name'")
("test-mode", "Use test mode, which will put data on given url via PUT")
("url", po::value<std::string>(), "Web server url for test mode")
("output-dir", po::value<std::string>(), "Directory to put files in non-test mode")
("files-prefix", po::value<std::string>(), "Prefix for stored files");
po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run();
po::variables_map options;
po::store(parsed, options);
po::notify(options);
if (options.empty() || options.count("help"))
{
std::cout << description << std::endl;
exit(0);
}
String url, metadata_path, files_prefix;
if (options.count("metadata-path"))
metadata_path = options["metadata-path"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No metadata-path option passed");
if (options.count("files-prefix"))
files_prefix = options["files-prefix"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No files-prefix option passed");
fs::path fs_path = fs::weakly_canonical(metadata_path);
if (!fs::exists(fs_path))
{
std::cerr << fmt::format("Data path ({}) does not exist", fs_path.string());
return 1;
}
String uuid;
if (!RE2::Extract(metadata_path, EXTRACT_UUID_PATTERN, "\\1", &uuid))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract uuid for: {}", metadata_path);
std::shared_ptr<WriteBuffer> metadata_buf;
std::function<std::shared_ptr<WriteBuffer>(const String &)> create_dst_buf;
String root_path;
if (options.count("test-mode"))
{
if (options.count("url"))
url = options["url"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No url option passed for test mode");
metadata_buf = std::make_shared<WriteBufferFromHTTP>(Poco::URI(fs::path(url) / (".index-" + uuid)), Poco::Net::HTTPRequest::HTTP_PUT);
create_dst_buf = [&](const String & remote_file_name)
{
return std::make_shared<WriteBufferFromHTTP>(Poco::URI(fs::path(url) / remote_file_name), Poco::Net::HTTPRequest::HTTP_PUT);
};
}
else
{
if (options.count("output-dir"))
root_path = options["output-dir"].as<std::string>();
else
root_path = fs::current_path();
metadata_buf = std::make_shared<WriteBufferFromFile>(fs::path(root_path) / (".index-" + uuid));
create_dst_buf = [&](const String & remote_file_name)
{
return std::make_shared<WriteBufferFromFile>(fs::path(root_path) / remote_file_name);
};
}
processTableFiles(fs_path, files_prefix, uuid, *metadata_buf, create_dst_buf);
metadata_buf->next();
metadata_buf->finalize();
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(false);
return 1;
}

View File

@ -23,6 +23,7 @@ SRCS(
client/QueryFuzzer.cpp
client/ConnectionParameters.cpp
client/Suggest.cpp
client/TestHint.cpp
extract-from-config/ExtractFromConfig.cpp
server/Server.cpp
server/MetricsTransmitter.cpp

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -48,6 +48,8 @@ struct AggregateFunctionSequenceMatchData final
bool sorted = true;
PODArrayWithStackMemory<TimestampEvents, 64> events_list;
/// sequenceMatch conditions met at least once in events_list
std::bitset<max_events> conditions_met;
void add(const Timestamp timestamp, const Events & events)
{
@ -56,6 +58,7 @@ struct AggregateFunctionSequenceMatchData final
{
events_list.emplace_back(timestamp, events);
sorted = false;
conditions_met |= events;
}
}
@ -64,29 +67,9 @@ struct AggregateFunctionSequenceMatchData final
if (other.events_list.empty())
return;
const auto size = events_list.size();
events_list.insert(std::begin(other.events_list), std::end(other.events_list));
/// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted)
std::sort(std::begin(events_list), std::end(events_list), Comparator{});
else
{
const auto begin = std::begin(events_list);
const auto middle = std::next(begin, size);
const auto end = std::end(events_list);
if (!sorted)
std::sort(begin, middle, Comparator{});
if (!other.sorted)
std::sort(middle, end, Comparator{});
std::inplace_merge(begin, middle, end, Comparator{});
}
sorted = true;
sorted = false;
conditions_met |= other.conditions_met;
}
void sort()
@ -290,6 +273,7 @@ private:
dfa_states.back().transition = DFATransition::SpecificEvent;
dfa_states.back().event = event_number - 1;
dfa_states.emplace_back();
conditions_in_pattern.set(event_number - 1);
}
if (!match(")"))
@ -518,6 +502,64 @@ protected:
return action_it == action_end;
}
/// Splits the pattern into deterministic parts separated by non-deterministic fragments
/// (time constraints and Kleene stars), and tries to match the deterministic parts in their specified order,
/// ignoring the non-deterministic fragments.
/// This function can quickly check that a full match is not possible if some deterministic fragment is missing.
template <typename EventEntry>
bool couldMatchDeterministicParts(const EventEntry events_begin, const EventEntry events_end, bool limit_iterations = true) const
{
size_t events_processed = 0;
auto events_it = events_begin;
const auto actions_end = std::end(actions);
auto actions_it = std::begin(actions);
auto det_part_begin = actions_it;
auto match_deterministic_part = [&events_it, events_end, &events_processed, det_part_begin, actions_it, limit_iterations]()
{
auto events_it_init = events_it;
auto det_part_it = det_part_begin;
while (det_part_it != actions_it && events_it != events_end)
{
/// matching any event
if (det_part_it->type == PatternActionType::AnyEvent)
++events_it, ++det_part_it;
/// matching specific event
else
{
if (events_it->second.test(det_part_it->extra))
++events_it, ++det_part_it;
/// abandon current matching, try to match the deterministic fragment further in the list
else
{
events_it = ++events_it_init;
det_part_it = det_part_begin;
}
}
if (limit_iterations && ++events_processed > sequence_match_max_iterations)
throw Exception{"Pattern application proves too difficult, exceeding max iterations (" + toString(sequence_match_max_iterations) + ")",
ErrorCodes::TOO_SLOW};
}
return det_part_it == actions_it;
};
for (; actions_it != actions_end; ++actions_it)
if (actions_it->type != PatternActionType::SpecificEvent && actions_it->type != PatternActionType::AnyEvent)
{
if (!match_deterministic_part())
return false;
det_part_begin = std::next(actions_it);
}
return match_deterministic_part();
}
private:
enum class DFATransition : char
{
@ -558,6 +600,8 @@ private:
protected:
/// `True` if the parsed pattern contains time assertions (?t...), `false` otherwise.
bool pattern_has_time;
/// sequenceMatch conditions met at least once in the pattern
std::bitset<max_events> conditions_in_pattern;
private:
std::string pattern;
@ -584,6 +628,12 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & output = assert_cast<ColumnUInt8 &>(to).getData();
if ((this->conditions_in_pattern & this->data(place).conditions_met) != this->conditions_in_pattern)
{
output.push_back(false);
return;
}
this->data(place).sort();
const auto & data_ref = this->data(place);
@ -592,8 +642,10 @@ public:
const auto events_end = std::end(data_ref.events_list);
auto events_it = events_begin;
bool match = this->pattern_has_time ? this->backtrackingMatch(events_it, events_end) : this->dfaMatch(events_it, events_end);
assert_cast<ColumnUInt8 &>(to).getData().push_back(match);
bool match = (this->pattern_has_time ?
(this->couldMatchDeterministicParts(events_begin, events_end) && this->backtrackingMatch(events_it, events_end)) :
this->dfaMatch(events_it, events_end));
output.push_back(match);
}
};
@ -614,8 +666,14 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & output = assert_cast<ColumnUInt64 &>(to).getData();
if ((this->conditions_in_pattern & this->data(place).conditions_met) != this->conditions_in_pattern)
{
output.push_back(0);
return;
}
this->data(place).sort();
assert_cast<ColumnUInt64 &>(to).getData().push_back(count(place));
output.push_back(count(place));
}
private:
@ -628,8 +686,12 @@ private:
auto events_it = events_begin;
size_t count = 0;
while (events_it != events_end && this->backtrackingMatch(events_it, events_end))
++count;
// check if there is a chance of matching the sequence at least once
if (this->couldMatchDeterministicParts(events_begin, events_end))
{
while (events_it != events_end && this->backtrackingMatch(events_it, events_end))
++count;
}
return count;
}

View File

@ -48,6 +48,7 @@ SRCS(
AggregateFunctionSequenceNextNode.cpp
AggregateFunctionSimpleLinearRegression.cpp
AggregateFunctionSimpleState.cpp
AggregateFunctionSingleValueOrNull.cpp
AggregateFunctionState.cpp
AggregateFunctionStatistics.cpp
AggregateFunctionStatisticsSimple.cpp

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F GroupBitmap | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -F GroupBitmap | sed 's/^\.\// /' | sort ?>
)
END()

27
src/Backups/ya.make Normal file
View File

@ -0,0 +1,27 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
BackupEntryConcat.cpp
BackupEntryFromAppendOnlyFile.cpp
BackupEntryFromImmutableFile.cpp
BackupEntryFromMemory.cpp
BackupEntryFromSmallFile.cpp
BackupFactory.cpp
BackupInDirectory.cpp
BackupRenamingConfig.cpp
BackupSettings.cpp
BackupUtils.cpp
hasCompatibleDataToRestoreTable.cpp
renameInCreateQuery.cpp
)
END()

14
src/Backups/ya.make.in Normal file
View File

@ -0,0 +1,14 @@
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -9,7 +9,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -17,7 +17,7 @@ PEERDIR(
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -2,11 +2,21 @@
#include <queue>
#include <type_traits>
#include <atomic>
#include <Poco/Mutex.h>
#include <Poco/Semaphore.h>
#include <common/MoveOrCopyIfThrow.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** A very simple thread-safe queue of limited size.
* If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty.
@ -17,9 +27,41 @@ class ConcurrentBoundedQueue
{
private:
std::queue<T> queue;
Poco::FastMutex mutex;
mutable Poco::FastMutex mutex;
Poco::Semaphore fill_count;
Poco::Semaphore empty_count;
std::atomic_bool closed = false;
template <typename... Args>
bool tryEmplaceImpl(Args &&... args)
{
bool emplaced = true;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (closed)
emplaced = false;
else
queue.emplace(std::forward<Args>(args)...);
}
if (emplaced)
fill_count.set();
else
empty_count.set();
return emplaced;
}
void popImpl(T & x)
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
}
public:
explicit ConcurrentBoundedQueue(size_t max_fill)
@ -30,91 +72,75 @@ public:
void push(const T & x)
{
empty_count.wait();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.push(x);
}
fill_count.set();
if (!tryEmplaceImpl(x))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "tryPush/tryEmplace must be used with close()");
}
template <typename... Args>
void emplace(Args &&... args)
{
empty_count.wait();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.emplace(std::forward<Args>(args)...);
}
fill_count.set();
if (!tryEmplaceImpl(std::forward<Args>(args)...))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "tryPush/tryEmplace must be used with close()");
}
void pop(T & x)
{
fill_count.wait();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
popImpl(x);
}
bool tryPush(const T & x, UInt64 milliseconds = 0)
{
if (empty_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.push(x);
}
fill_count.set();
return true;
}
return false;
if (!empty_count.tryWait(milliseconds))
return false;
return tryEmplaceImpl(x);
}
template <typename... Args>
bool tryEmplace(UInt64 milliseconds, Args &&... args)
{
if (empty_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.emplace(std::forward<Args>(args)...);
}
fill_count.set();
return true;
}
return false;
if (!empty_count.tryWait(milliseconds))
return false;
return tryEmplaceImpl(std::forward<Args>(args)...);
}
bool tryPop(T & x, UInt64 milliseconds = 0)
{
if (fill_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
return true;
}
return false;
if (!fill_count.tryWait(milliseconds))
return false;
popImpl(x);
return true;
}
size_t size()
size_t size() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return queue.size();
}
size_t empty()
size_t empty() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return queue.empty();
}
/// Forbids to push new elements to queue.
/// Returns false if queue was not closed before call, returns true if queue was already closed.
bool close()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return closed.exchange(true);
}
bool isClosed() const
{
return closed.load();
}
void clear()
{
while (fill_count.tryWait(0))

View File

@ -540,7 +540,7 @@ void ZooKeeper::sendThread()
try
{
while (!expired)
while (!requests_queue.isClosed())
{
auto prev_bytes_sent = out->count();
@ -572,7 +572,7 @@ void ZooKeeper::sendThread()
info.request->has_watch = true;
}
if (expired)
if (requests_queue.isClosed())
{
break;
}
@ -617,7 +617,7 @@ void ZooKeeper::receiveThread()
try
{
Int64 waited = 0;
while (!expired)
while (!requests_queue.isClosed())
{
auto prev_bytes_received = in->count();
@ -640,7 +640,7 @@ void ZooKeeper::receiveThread()
if (in->poll(max_wait))
{
if (expired)
if (requests_queue.isClosed())
break;
receiveEvent();
@ -840,12 +840,10 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
auto expire_session_if_not_expired = [&]
{
std::lock_guard lock(push_request_mutex);
if (!expired)
{
expired = true;
/// No new requests will appear in queue after close()
bool was_already_closed = requests_queue.close();
if (!was_already_closed)
active_session_metric_increment.destroy();
}
};
try
@ -1018,17 +1016,15 @@ void ZooKeeper::pushRequest(RequestInfo && info)
}
}
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(push_request_mutex);
if (expired)
if (requests_queue.isClosed())
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
{
if (requests_queue.isClosed())
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
}
}
catch (...)
{

View File

@ -121,7 +121,7 @@ public:
/// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const override { return expired; }
bool isExpired() const override { return requests_queue.isClosed(); }
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
@ -207,11 +207,9 @@ private:
int64_t session_id = 0;
std::atomic<XID> next_xid {1};
std::atomic<bool> expired {false};
/// Mark session finalization start. Used to avoid simultaneous
/// finalization from different threads. One-shot flag.
std::atomic<bool> finalization_started {false};
std::mutex push_request_mutex;
using clock = std::chrono::steady_clock;
@ -225,7 +223,7 @@ private:
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
RequestsQueue requests_queue{1024};
void pushRequest(RequestInfo && info);
using Operations = std::map<XID, RequestInfo>;

View File

@ -118,7 +118,6 @@ SRCS(
isLocalAddress.cpp
isValidUTF8.cpp
malloc.cpp
memory.cpp
new_delete.cpp
parseAddress.cpp
parseGlobs.cpp

View File

@ -24,7 +24,7 @@ INCLUDE(${ARCADIA_ROOT}/clickhouse/cmake/yandex/ya.make.versions.inc)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -32,6 +32,7 @@ SRCS(
CompressionCodecT64.cpp
CompressionCodecZSTD.cpp
CompressionFactory.cpp
CompressionFactoryAdditions.cpp
ICompressionCodec.cpp
LZ4_decompress_faster.cpp
getCompressionCodecForFile.cpp

View File

@ -15,7 +15,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -12,7 +12,6 @@
#include <IO/LimitReadBuffer.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>

View File

@ -74,6 +74,7 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, http_max_single_read_retries, 4, "The maximum number of retries during single http read.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \

View File

@ -37,6 +37,7 @@ SRCS(
PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp
QueryProcessingStage.cpp
ServerUUID.cpp
Settings.cpp
SettingsEnums.cpp
SettingsFields.cpp

View File

@ -10,7 +10,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -44,6 +44,7 @@ SRCS(
SquashingTransform.cpp
TTLAggregationAlgorithm.cpp
TTLBlockInputStream.cpp
TTLCalcInputStream.cpp
TTLColumnAlgorithm.cpp
TTLDeleteAlgorithm.cpp
TTLUpdateInfoAlgorithm.cpp

View File

@ -9,7 +9,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -416,7 +416,8 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
@ -433,7 +434,7 @@ void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has
}
}
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (has_force_restore_data_flag)
{

View File

@ -47,7 +47,7 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override;

View File

@ -36,9 +36,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context,
bool /* has_force_restore_data_flag */,
bool /*force_attach*/)
ContextMutablePtr local_context, bool /* has_force_restore_data_flag */, bool /*force_attach*/, bool /* skip_startup_tables */)
{
iterateMetadataFiles(local_context, [this](const String & file_name)
{
@ -246,6 +244,8 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
if (!ast || !endsWith(table->getName(), "Log"))
throw Exception("Only *Log tables can be used with Lazy database engine.", ErrorCodes::LOGICAL_ERROR);
table->startup();
{
std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name);

View File

@ -26,9 +26,7 @@ public:
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(
ContextMutablePtr context,
bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void createTable(
ContextPtr context,

View File

@ -83,7 +83,8 @@ DatabaseOrdinary::DatabaseOrdinary(
{
}
void DatabaseOrdinary::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/)
void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
@ -201,12 +202,20 @@ void DatabaseOrdinary::loadStoredObjects(ContextMutablePtr local_context, bool h
pool.wait();
/// After all tables was basically initialized, startup them.
startupTables(pool);
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTablesImpl(pool);
}
}
void DatabaseOrdinary::startupTables()
{
ThreadPool pool;
startupTablesImpl(pool);
}
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
{
LOG_INFO(log, "Starting up tables.");

View File

@ -20,7 +20,9 @@ public:
String getEngineName() const override { return "Ordinary"; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void startupTables() override;
void alterTable(
ContextPtr context,
@ -35,7 +37,7 @@ protected:
const String & statement,
ContextPtr query_context);
void startupTables(ThreadPool & thread_pool);
void startupTablesImpl(ThreadPool & thread_pool);
};
}

View File

@ -305,11 +305,12 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
tryConnectToZooKeeperAndInitDatabase(force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();

View File

@ -57,7 +57,7 @@ public:
void drop(ContextPtr /*context*/) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void shutdown() override;
friend struct DatabaseReplicatedTask;

View File

@ -123,7 +123,15 @@ public:
/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(ContextMutablePtr /*context*/, bool /*has_force_restore_data_flag*/, bool /*force_attach*/ = false) {}
virtual void loadStoredObjects(
ContextMutablePtr /*context*/,
bool /*has_force_restore_data_flag*/,
bool /*force_attach*/ = false,
bool /* skip_startup_tables */ = false)
{
}
virtual void startupTables() {}
/// Check the existence of the table.
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;

View File

@ -93,10 +93,11 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
exception = exception_;
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach)
template <typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(
ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach);
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (!force_attach)
materialize_thread.assertMySQLAvailable();

View File

@ -43,7 +43,7 @@ protected:
public:
String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -405,7 +405,7 @@ String DatabaseMySQL::getMetadataPath() const
return metadata_path;
}
void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, bool, bool /*force_attach*/)
void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, bool, bool /*force_attach*/, bool /* skip_startup_tables */)
{
std::lock_guard<std::mutex> lock{mutex};

View File

@ -75,7 +75,7 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
StoragePtr detachTable(const String & table_name) override;

View File

@ -109,9 +109,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseMaterializedPostgreSQL::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
try
{
@ -124,7 +125,6 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_c
if (!force_attach)
throw;
}
}

View File

@ -43,10 +43,10 @@ public:
String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
DatabaseTablesIteratorPtr getTablesIterator(
ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -280,7 +280,7 @@ void DatabasePostgreSQL::drop(ContextPtr /*context*/)
}
void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, bool, bool /*force_attach*/)
void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, bool, bool /*force_attach*/, bool /* skip_startup_tables */)
{
{
std::lock_guard<std::mutex> lock{mutex};

View File

@ -48,7 +48,7 @@ public:
bool empty() const override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;

View File

@ -13,8 +13,10 @@ struct DiskType
RAM,
S3,
HDFS,
Encrypted
Encrypted,
WebServer
};
static String toString(Type disk_type)
{
switch (disk_type)
@ -29,6 +31,8 @@ struct DiskType
return "hdfs";
case Type::Encrypted:
return "encrypted";
case Type::WebServer:
return "web";
}
__builtin_unreachable();
}

336
src/Disks/DiskWebServer.cpp Normal file
View File

@ -0,0 +1,336 @@
#include "DiskWebServer.h"
#include <common/logger_useful.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Disks/ReadIndirectBufferFromWebServer.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IDiskRemote.h>
#include <Access/AccessControlManager.h>
#include <Poco/Exception.h>
#include <re2/re2.h>
#define UUID_PATTERN "[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}"
#define EXTRACT_UUID_PATTERN fmt::format(".*/({})/.*", UUID_PATTERN)
#define DIRECTORY_FILE_PATTERN(prefix) fmt::format("{}-({})-(\\w+)-(\\w+\\.\\w+)", prefix, UUID_PATTERN)
#define ROOT_FILE_PATTERN(prefix) fmt::format("{}-({})-(\\w+\\.\\w+)", prefix, UUID_PATTERN)
#define MATCH_DIRECTORY_FILE_PATTERN fmt::format(".*/({})/(\\w+)/(\\w+\\.\\w+)", UUID_PATTERN)
#define MATCH_ROOT_FILE_PATTERN fmt::format(".*/({})/(\\w+\\.\\w+)", UUID_PATTERN)
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int NETWORK_ERROR;
extern const int NOT_IMPLEMENTED;
}
void DiskWebServer::Metadata::initialize(const String & uri_with_path, const String & files_prefix, const String & table_uuid, ContextPtr context) const
{
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_with_path) / (".index-" + table_uuid)),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(context));
String uuid, directory, file, remote_file_name;
size_t file_size;
while (!metadata_buf.eof())
{
readText(remote_file_name, metadata_buf);
assertChar('\t', metadata_buf);
readIntText(file_size, metadata_buf);
assertChar('\n', metadata_buf);
LOG_DEBUG(&Poco::Logger::get("DiskWeb"), "Read file: {}, size: {}", remote_file_name, file_size);
/*
* URI/ {prefix}-{uuid}-all_x_x_x-{file}
* ...
* {prefix}-{uuid}-format_version.txt
* {prefix}-{uuid}-detached-{file}
* ...
**/
if (RE2::FullMatch(remote_file_name, DIRECTORY_FILE_PATTERN(files_prefix), &uuid, &directory, &file))
{
if (uuid != table_uuid)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected uuid: {}, expected: {}", uuid, table_uuid);
tables_data[uuid][directory].emplace(File(file, file_size));
}
else if (RE2::FullMatch(remote_file_name, ROOT_FILE_PATTERN(files_prefix), &uuid, &file))
{
if (uuid != table_uuid)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected uuid: {}, expected: {}", uuid, table_uuid);
tables_data[uuid][file].emplace(File(file, file_size));
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", remote_file_name);
}
}
template <typename T>
class DiskWebDirectoryIterator final : public IDiskDirectoryIterator
{
public:
using Directory = std::unordered_map<String, T>;
DiskWebDirectoryIterator(Directory & directory_, const String & directory_root_)
: directory(directory_), iter(directory.begin()), directory_root(directory_root_)
{
}
void next() override { ++iter; }
bool isValid() const override
{
return iter != directory.end();
}
String path() const override
{
return fs::path(directory_root) / name();
}
String name() const override
{
return iter->first;
}
private:
Directory & directory;
typename Directory::iterator iter;
const String directory_root;
};
class ReadBufferFromWebServer final : public ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>
{
public:
ReadBufferFromWebServer(
const String & uri_,
RemoteMetadata metadata_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_)
: ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>(metadata_)
, uri(uri_)
, context(context_)
, max_read_tries(max_read_tries_)
, buf_size(buf_size_)
{
}
std::unique_ptr<ReadIndirectBufferFromWebServer> createReadBuffer(const String & path) override
{
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, max_read_tries, buf_size);
}
private:
String uri;
ContextPtr context;
size_t max_read_tries;
size_t buf_size;
};
class WriteBufferFromNothing : public WriteBufferFromFile
{
public:
WriteBufferFromNothing() : WriteBufferFromFile("/dev/null") {}
void sync() override {}
};
DiskWebServer::DiskWebServer(
const String & disk_name_,
const String & uri_,
const String & metadata_path_,
ContextPtr context_,
SettingsPtr settings_)
: WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("DiskWeb"))
, uri(uri_)
, name(disk_name_)
, metadata_path(metadata_path_)
, settings(std::move(settings_))
{
}
String DiskWebServer::getFileName(const String & path) const
{
String result;
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN)
&& RE2::Extract(path, MATCH_DIRECTORY_FILE_PATTERN, fmt::format(R"({}-\1-\2-\3)", settings->files_prefix), &result))
return result;
if (RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN)
&& RE2::Extract(path, MATCH_ROOT_FILE_PATTERN, fmt::format(R"({}-\1-\2)", settings->files_prefix), &result))
return result;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", path);
}
bool DiskWebServer::findFileInMetadata(const String & path, File & file_info) const
{
String table_uuid, directory_name, file_name;
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN, &table_uuid, &directory_name, &file_name)
|| RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN, &table_uuid, &file_name))
{
if (directory_name.empty())
directory_name = file_name;
if (!metadata.tables_data.count(table_uuid))
return false;
if (!metadata.tables_data[table_uuid].count(directory_name))
return false;
const auto & files = metadata.tables_data[table_uuid][directory_name];
auto file = files.find(File(file_name));
if (file == files.end())
return false;
file_info = *file;
return true;
}
return false;
}
bool DiskWebServer::exists(const String & path) const
{
LOG_DEBUG(log, "Checking existence of file: {}", path);
File file;
return findFileInMetadata(path, file);
}
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
{
LOG_DEBUG(log, "Read from file by path: {}", path);
File file;
if (!findFileInMetadata(path, file))
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
RemoteMetadata meta(uri, fs::path(path).parent_path() / fs::path(path).filename());
meta.remote_fs_objects.emplace_back(std::make_pair(getFileName(path), file.size));
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskWebServer::writeFile(const String & path, size_t, WriteMode)
{
if (path.ends_with("format_version.txt"))
return std::make_unique<WriteBufferFromNothing>();
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
{
LOG_DEBUG(log, "Iterate directory: {}", path);
String uuid;
if (RE2::FullMatch(path, ".*/store/"))
return std::make_unique<DiskWebDirectoryIterator<RootDirectory>>(metadata.tables_data, path);
if (!RE2::Extract(path, EXTRACT_UUID_PATTERN, "\\1", &uuid))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot extract uuid for: {}", path);
/// Do not throw if it is not a query, but disk load.
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
try
{
if (!metadata.tables_data.count(uuid))
metadata.initialize(uri, settings->files_prefix, uuid, getContext());
}
catch (const Poco::Exception &)
{
const auto message = getCurrentExceptionMessage(false);
if (can_throw)
{
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
}
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Cannot load disk metadata. Error: {}", message);
/// Empty iterator.
return std::make_unique<DiskWebDirectoryIterator<Directory>>(metadata.tables_data[""], path);
}
return std::make_unique<DiskWebDirectoryIterator<Directory>>(metadata.tables_data[uuid], path);
}
size_t DiskWebServer::getFileSize(const String & path) const
{
File file;
if (!findFileInMetadata(path, file))
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
return file.size;
}
bool DiskWebServer::isFile(const String & path) const
{
return RE2::FullMatch(path, ".*/\\w+.\\w+");
}
bool DiskWebServer::isDirectory(const String & path) const
{
return RE2::FullMatch(path, ".*/\\w+");
}
void registerDiskWebServer(DiskFactory & factory)
{
auto creator = [](const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri{config.getString(config_prefix + ".endpoint")};
if (!uri.ends_with('/'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "URI must end with '/', but '{}' doesn't.", uri);
auto settings = std::make_unique<DiskWebServerSettings>(
context->getGlobalContext()->getSettingsRef().http_max_single_read_retries,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getString(config_prefix + ".files_prefix", disk_name));
String metadata_path = fs::path(context->getPath()) / "disks" / disk_name / "";
return std::make_shared<DiskWebServer>(disk_name, uri, metadata_path, context, std::move(settings));
};
factory.registerDiskType("web", creator);
}
}

234
src/Disks/DiskWebServer.h Normal file
View File

@ -0,0 +1,234 @@
#pragma once
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h>
#include <Core/UUID.h>
#include <set>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
struct DiskWebServerSettings
{
/// Number of read attempts before throw that network is unreachable.
size_t max_read_tries;
/// Passed to SeekAvoidingReadBuffer.
size_t min_bytes_for_seek;
String files_prefix;
DiskWebServerSettings(size_t max_read_tries_, size_t min_bytes_for_seek_, String files_prefix_)
: max_read_tries(max_read_tries_) , min_bytes_for_seek(min_bytes_for_seek_), files_prefix(files_prefix_) {}
};
/*
* Quick ready test - you can try this disk, by using these queries (disk has two tables) and this endpoint:
*
* ATTACH TABLE contributors UUID 'a563f7d8-fb00-4d50-a563-f7d8fb007d50' (good_person_name String) engine=MergeTree() order by good_person_name settings storage_policy='web';
* ATTACH TABLE test UUID '11c7a2f9-a949-4c88-91c7-a2f9a949ec88' (a Int32) engine=MergeTree() order by a settings storage_policy='web';
*
* <storage_configuration>
* <disks>
* <web>
* <type>web</type>
* <endpoint>https://clickhouse-datasets.s3.yandex.net/kssenii-static-files-disk-test/kssenii-disk-tests/test1/</endpoint>
* <files_prefix>data</files_prefix>
* </web>
* </disks>
* <policies>
* <web>
* <volumes>
* <main>
* <disk>web</disk>
* </main>
* </volumes>
* </web>
* </policies>
* </storage_configuration>
*
* To get files for upload run:
* clickhouse static-files-disk-uploader --metadata-path <path> --output-dir <dir> --files-prefix data
* (--metadata-path can be found in query: `select data_paths from system.tables where name='<table_name>';`)
*
* If url is not reachable on disk load when server is starting up tables, then all errors are caught.
* If in this case there were errors, tables can be reloaded (become visible) via detach table table_name -> attach table table_name.
* If metadata was successfully loaded at server startup, then tables are available straight away.
**/
class DiskWebServer : public IDisk, WithContext
{
using SettingsPtr = std::unique_ptr<DiskWebServerSettings>;
public:
DiskWebServer(const String & disk_name_,
const String & files_root_path_uri_,
const String & metadata_path_,
ContextPtr context,
SettingsPtr settings_);
struct File
{
String name;
size_t size;
File(const String & name_ = "", const size_t size_ = 0) : name(name_), size(size_) {}
bool operator<(const File & other) const { return name < other.name; }
bool operator==(const File & other) const { return name == other.name; }
};
using Directory = std::set<File>;
/* Each root directory contains either directories like
* all_x_x_x/{file}, detached/, etc, or root files like format_version.txt.
*/
using RootDirectory = std::unordered_map<String, Directory>;
/* Each table is attached via ATTACH TABLE table UUID <uuid> <def>.
* Then there is a mapping: {table uuid} -> {root directory}
*/
using TableDirectories = std::unordered_map<String, RootDirectory>;
struct Metadata
{
/// Fetch meta only when required.
mutable TableDirectories tables_data;
Metadata() {}
void initialize(const String & uri_with_path, const String & files_prefix, const String & uuid, ContextPtr context) const;
};
bool findFileInMetadata(const String & path, File & file_info) const;
bool supportZeroCopyReplication() const override { return false; }
String getFileName(const String & path) const;
DiskType::Type getType() const override { return DiskType::Type::WebServer; }
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
/// Disk info
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_path; }
bool isReadOnly() const override { return true; }
UInt64 getTotalSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const final override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const final override { return std::numeric_limits<UInt64>::max(); }
/// Read-only part
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
size_t getFileSize(const String & path) const override;
void listFiles(const String & /* path */, std::vector<String> & /* file_names */) override { }
void setReadOnly(const String & /* path */) override {}
bool isDirectory(const String & path) const override;
DiskDirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; }
/// Write and modification part
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String &, size_t, WriteMode) override;
void moveFile(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void replaceFile(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeFile(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeFileIfExists(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
ReservationPtr reserve(UInt64 /*bytes*/) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeRecursive(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeSharedFile(const String &, bool) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeSharedRecursive(const String &, bool) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void clearDirectory(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void moveDirectory(const String &, const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeDirectory(const String &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void setLastModified(const String &, const Poco::Timestamp &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
/// Create part
void createFile(const String &) final override {}
void createDirectory(const String &) override {}
void createDirectories(const String &) override {}
void createHardLink(const String &, const String &) override {}
private:
Poco::Logger * log;
String uri, name;
const String metadata_path;
SettingsPtr settings;
Metadata metadata;
};
}

View File

@ -216,6 +216,8 @@ public:
/// Overrode in remote fs disks.
virtual bool supportZeroCopyReplication() const = 0;
virtual bool isReadOnly() const { return false; }
/// Invoked when Global Context is shutdown.
virtual void shutdown() {}

View File

@ -33,10 +33,9 @@ IDiskRemote::Metadata::Metadata(
const String & disk_path_,
const String & metadata_file_path_,
bool create)
: remote_fs_root_path(remote_fs_root_path_)
: RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, disk_path(disk_path_)
, metadata_file_path(metadata_file_path_)
, total_size(0), remote_fs_objects(0), ref_count(0)
, total_size(0), ref_count(0)
{
if (create)
return;
@ -72,10 +71,9 @@ IDiskRemote::Metadata::Metadata(
readEscapedString(remote_fs_object_path, buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
if (!boost::algorithm::starts_with(remote_fs_object_path, remote_fs_root_path))
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond S3 root path. Path: {}, root path: {}, disk path: {}",
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, disk_path_);
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());

View File

@ -148,34 +148,42 @@ private:
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
/// Remote FS (S3, HDFS) metadata file layout:
/// Number of FS objects, Total size of all FS objects.
/// Each FS object represents path where object located in FS and size of object.
struct IDiskRemote::Metadata
/// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS<T>
struct RemoteMetadata
{
using PathAndSize = std::pair<String, size_t>;
/// Remote FS objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// URI
const String & remote_fs_root_path;
/// Relative path to metadata file on local FS.
const String metadata_file_path;
RemoteMetadata(const String & remote_fs_root_path_, const String & metadata_file_path_)
: remote_fs_root_path(remote_fs_root_path_), metadata_file_path(metadata_file_path_) {}
};
/// Remote FS (S3, HDFS) metadata file layout:
/// FS objects, their number and total size of all FS objects.
/// Each FS object represents a file path in remote FS and its size.
struct IDiskRemote::Metadata : RemoteMetadata
{
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
using PathAndSize = std::pair<String, size_t>;
/// Remote FS (S3, HDFS) root path.
const String & remote_fs_root_path;
/// Disk path.
const String & disk_path;
/// Relative path to metadata file on local FS.
String metadata_file_path;
/// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0;
/// Remote FS (S3, HDFS) objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// Number of references (hardlinks) to this metadata file.
UInt32 ref_count = 0;

View File

@ -1,8 +1,8 @@
#include "ReadIndirectBufferFromRemoteFS.h"
#if USE_AWS_S3 || USE_HDFS
#include <IO/ReadBufferFromS3.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/ReadIndirectBufferFromWebServer.h>
namespace DB
@ -16,7 +16,7 @@ namespace ErrorCodes
template<typename T>
ReadIndirectBufferFromRemoteFS<T>::ReadIndirectBufferFromRemoteFS(
IDiskRemote::Metadata metadata_)
RemoteMetadata metadata_)
: metadata(std::move(metadata_))
{
}
@ -137,6 +137,7 @@ template
class ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>;
#endif
}
template
class ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>;
#endif
}

View File

@ -4,8 +4,6 @@
#include <Common/config.h>
#endif
#if USE_AWS_S3 || USE_HDFS
#include <IO/ReadBufferFromFile.h>
#include <Disks/IDiskRemote.h>
#include <utility>
@ -19,7 +17,7 @@ template <typename T>
class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
explicit ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_);
explicit ReadIndirectBufferFromRemoteFS(RemoteMetadata metadata_);
off_t seek(off_t offset_, int whence) override;
@ -30,7 +28,7 @@ public:
virtual std::unique_ptr<T> createReadBuffer(const String & path) = 0;
protected:
IDiskRemote::Metadata metadata;
RemoteMetadata metadata;
private:
std::unique_ptr<T> initialize();
@ -47,5 +45,3 @@ private:
};
}
#endif

View File

@ -0,0 +1,132 @@
#include "ReadIndirectBufferFromWebServer.h"
#include <common/logger_useful.h>
#include <Core/Types.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <thread>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int NETWORK_ERROR;
}
ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer(const String & url_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
, log(&Poco::Logger::get("ReadIndirectBufferFromWebServer"))
, context(context_)
, url(url_)
, buf_size(buf_size_)
, max_read_tries(max_read_tries_)
{
}
std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
{
Poco::URI uri(url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
LOG_DEBUG(log, "Reading from offset: {}", offset);
return std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(context),
0,
Poco::Net::HTTPBasicCredentials{},
buf_size,
headers);
}
bool ReadIndirectBufferFromWebServer::nextImpl()
{
bool next_result = false, successful_read = false;
if (impl)
{
/// Restore correct position at the needed offset.
impl->position() = position();
assert(!impl->hasPendingData());
}
else
{
try
{
impl = initialize();
}
catch (const Poco::Exception & e)
{
throw Exception(ErrorCodes::NETWORK_ERROR, "Unreachable url: {}. Error: {}", url, e.what());
}
next_result = impl->hasPendingData();
}
for (size_t try_num = 0; (try_num < max_read_tries) && !next_result; ++try_num)
{
try
{
next_result = impl->next();
successful_read = true;
break;
}
catch (const Exception & e)
{
LOG_WARNING(log, "Read attempt {}/{} failed from {}. ({})", try_num, max_read_tries, url, e.message());
impl.reset();
impl = initialize();
next_result = impl->hasPendingData();
}
}
if (!successful_read)
throw Exception(ErrorCodes::NETWORK_ERROR, "All read attempts ({}) failed for uri: {}", max_read_tries, url);
if (next_result)
{
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
offset += working_buffer.size();
}
return next_result;
}
off_t ReadIndirectBufferFromWebServer::seek(off_t offset_, int whence)
{
if (impl)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer");
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed");
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", std::to_string(offset_));
offset = offset_;
return offset;
}
off_t ReadIndirectBufferFromWebServer::getPosition()
{
return offset - available();
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <IO/SeekableReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Interpreters/Context.h>
namespace DB
{
/* Read buffer, which reads via http, but is used as ReadBufferFromFileBase.
* Used to read files, hosted on a web server with static files.
*
* Usage: ReadIndirectBufferFromRemoteFS -> SeekAvoidingReadBuffer -> ReadIndirectBufferFromWebServer -> ReadWriteBufferFromHTTP.
*/
class ReadIndirectBufferFromWebServer : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
explicit ReadIndirectBufferFromWebServer(const String & url_,
ContextPtr context_,
size_t max_read_tries_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
private:
std::unique_ptr<ReadBuffer> initialize();
Poco::Logger * log;
ContextPtr context;
const String url;
size_t buf_size, max_read_tries;
std::unique_ptr<ReadBuffer> impl;
off_t offset = 0;
};
}

View File

@ -146,7 +146,7 @@ public:
std::unique_ptr<ReadBufferFromS3> createReadBuffer(const String & path) override
{
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.remote_fs_root_path + path, max_single_read_retries, buf_size);
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, buf_size);
}
private:

View File

@ -1,8 +1,8 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#if USE_AWS_S3 || USE_HDFS
#include <IO/WriteBufferFromS3.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteBufferFromHTTP.h>
namespace DB
@ -65,6 +65,7 @@ template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>;
#endif
}
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHTTP>;
#endif
}

View File

@ -4,8 +4,6 @@
#include <Common/config.h>
#endif
#if USE_AWS_S3 || USE_HDFS
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h>
@ -38,5 +36,3 @@ private:
};
}
#endif

View File

@ -24,6 +24,8 @@ void registerDiskEncrypted(DiskFactory & factory);
void registerDiskHDFS(DiskFactory & factory);
#endif
void registerDiskWebServer(DiskFactory & factory);
void registerDisks()
{
@ -43,6 +45,8 @@ void registerDisks()
#if USE_HDFS
registerDiskHDFS(factory);
#endif
registerDiskWebServer(factory);
}
}

View File

@ -16,6 +16,7 @@ SRCS(
DiskMemory.cpp
DiskRestartProxy.cpp
DiskSelector.cpp
DiskWebServer.cpp
IDisk.cpp
IDiskRemote.cpp
IVolume.cpp
@ -23,6 +24,7 @@ SRCS(
ReadIndirectBufferFromRemoteFS.cpp
SingleDiskVolume.cpp
StoragePolicy.cpp
TemporaryFileOnDisk.cpp
VolumeJBOD.cpp
VolumeRAID1.cpp
WriteIndirectBufferFromRemoteFS.cpp

View File

@ -7,7 +7,7 @@ PEERDIR(
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -161,14 +161,12 @@ InputFormatPtr FormatFactory::getInput(
if (settings.max_memory_usage_for_user && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage_for_user)
parallel_parsing = false;
if (parallel_parsing && name == "JSONEachRow")
if (parallel_parsing)
{
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix()
/// For JSONEachRow we can safely skip whitespace characters
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == '[')
parallel_parsing = false; /// Disable it for JSONEachRow if data is in square brackets (see JSONEachRowRowInputFormat)
const auto & non_trivial_prefix_and_suffix_checker = getCreators(name).non_trivial_prefix_and_suffix_checker;
/// Disable parallel parsing for input formats with non-trivial readPrefix() and readSuffix().
if (non_trivial_prefix_and_suffix_checker && non_trivial_prefix_and_suffix_checker(buf))
parallel_parsing = false;
}
if (parallel_parsing)
@ -392,6 +390,14 @@ void FormatFactory::registerInputFormatProcessor(const String & name, InputProce
target = std::move(input_creator);
}
void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker)
{
auto & target = dict[name].non_trivial_prefix_and_suffix_checker;
if (target)
throw Exception("FormatFactory: Non trivial prefix and suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(non_trivial_prefix_and_suffix_checker);
}
void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator)
{
auto & target = dict[name].output_processor_creator;

View File

@ -93,6 +93,11 @@ private:
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
/// Some input formats can have non trivial readPrefix() and readSuffix(),
/// so in some cases there is no possibility to use parallel parsing.
/// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
struct Creators
{
InputCreator input_creator;
@ -102,6 +107,7 @@ private:
FileSegmentationEngine file_segmentation_engine;
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -166,6 +172,8 @@ public:
void registerOutputFormat(const String & name, OutputCreator output_creator);
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);

View File

@ -87,4 +87,11 @@ std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, D
return {loadAtPosition(in, memory, pos), number_of_rows};
}
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf)
{
/// For JSONEachRow we can safely skip whitespace characters
skipWhitespaceIfAny(buf);
return buf.eof() || *buf.position() == '[';
}
}

View File

@ -5,4 +5,6 @@ namespace DB
std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf);
}

View File

@ -79,6 +79,9 @@ void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
/// Non trivial prefix and suffix checkers for disabling parallel parsing.
void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory);
void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory);
void registerFormats()
{
@ -153,6 +156,9 @@ void registerFormats()
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(factory);
#endif
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);
}
}

View File

@ -10,7 +10,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -290,6 +290,7 @@ SRCS(
geohashesInBox.cpp
getMacro.cpp
getScalar.cpp
getServerPort.cpp
getSetting.cpp
getSizeOfEnumType.cpp
globalVariable.cpp
@ -444,6 +445,7 @@ SRCS(
registerFunctionsNull.cpp
registerFunctionsRandom.cpp
registerFunctionsReinterpret.cpp
registerFunctionsSnowflake.cpp
registerFunctionsString.cpp
registerFunctionsStringRegexp.cpp
registerFunctionsStringSearch.cpp
@ -477,12 +479,14 @@ SRCS(
s2RectIntersection.cpp
s2RectUnion.cpp
s2ToGeo.cpp
serverUUID.cpp
sigmoid.cpp
sign.cpp
sin.cpp
sinh.cpp
sleep.cpp
sleepEachRow.cpp
snowflake.cpp
sqrt.cpp
startsWith.cpp
stem.cpp

View File

@ -315,6 +315,7 @@ void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPR
if (!(status == Poco::Net::HTTPResponse::HTTP_OK
|| status == Poco::Net::HTTPResponse::HTTP_CREATED
|| status == Poco::Net::HTTPResponse::HTTP_ACCEPTED
|| status == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT /// Reading with Range header was successful.
|| (isRedirect(status) && allow_redirects)))
{
std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM

View File

@ -57,6 +57,7 @@ SRCS(
ReadBufferFromMemory.cpp
ReadBufferFromPocoSocket.cpp
ReadHelpers.cpp
ReadIndirectBufferFromWebServer.cpp
SeekAvoidingReadBuffer.cpp
TimeoutSetter.cpp
UseSSL.cpp

View File

@ -18,7 +18,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -157,6 +157,15 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startup();
/// Start up tables after all databases are loaded.
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
database->startupTables();
}
}
void DatabaseCatalog::shutdownImpl()
@ -609,6 +618,12 @@ Dependencies DatabaseCatalog::getDependencies(const StorageID & from) const
return Dependencies(iter->second.begin(), iter->second.end());
}
ViewDependencies DatabaseCatalog::getViewDependencies() const
{
std::lock_guard lock{databases_mutex};
return ViewDependencies(view_dependencies.begin(), view_dependencies.end());
}
void
DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID & old_where, const StorageID & new_from,
const StorageID & new_where)

View File

@ -174,6 +174,7 @@ public:
void addDependency(const StorageID & from, const StorageID & where);
void removeDependency(const StorageID & from, const StorageID & where);
Dependencies getDependencies(const StorageID & from) const;
ViewDependencies getViewDependencies() const;
/// For Materialized and Live View
void updateDependency(const StorageID & old_from, const StorageID & old_where,const StorageID & new_from, const StorageID & new_where);

View File

@ -32,6 +32,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
extern const int NOT_IMPLEMENTED;
extern const int TABLE_IS_READ_ONLY;
}
@ -62,6 +63,8 @@ BlockIO InterpreterAlterQuery::execute()
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isReadOnly())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
auto alter_lock = table->lockForAlter(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();

View File

@ -272,7 +272,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
}
/// We use global context here, because storages lifetime is bigger than query context lifetime
database->loadStoredObjects(getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach); //-V560
database->loadStoredObjects(
getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach, skip_startup_tables); //-V560
}
catch (...)
{

View File

@ -52,6 +52,11 @@ public:
force_attach = force_attach_;
}
void setSkipStartupTables(bool skip_startup_tables_)
{
skip_startup_tables = skip_startup_tables_;
}
/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach);
@ -94,6 +99,7 @@ private:
/// Is this an internal query - not from the user.
bool internal = false;
bool force_attach = false;
bool skip_startup_tables = false;
mutable String as_database_saved;
mutable String as_table_saved;

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY;
extern const int TABLE_IS_READ_ONLY;
}
@ -162,6 +163,8 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
if (query.kind == ASTDropQuery::Kind::Detach)
{
getContext()->checkAccess(drop_storage, table_id);
if (table->isReadOnly())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
if (table->isDictionary())
{
@ -195,6 +198,8 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR);
getContext()->checkAccess(AccessType::TRUNCATE, table_id);
if (table->isReadOnly())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
table->checkTableCanBeDropped();

View File

@ -288,7 +288,7 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
#if defined(ARCADIA_BUILD)
/// This is harmful field that is used only in foreign "Arcadia" build.
if (const auto * basic_credentials = dynamic_cast<const BasicCredentials *>(&credentials_))
session_client_info->current_password = basic_credentials->getPassword();
prepared_client_info->current_password = basic_credentials->getPassword();
#endif
}

View File

@ -43,6 +43,7 @@ static void executeCreateQuery(
interpreter.setInternal(true);
interpreter.setForceAttach(true);
interpreter.setForceRestoreData(has_force_restore_data_flag);
interpreter.setSkipStartupTables(true);
interpreter.execute();
}

View File

@ -70,6 +70,7 @@ SRCS(
InJoinSubqueriesPreprocessor.cpp
InternalTextLogsQueue.cpp
InterpreterAlterQuery.cpp
InterpreterBackupQuery.cpp
InterpreterCheckQuery.cpp
InterpreterCreateFunctionQuery.cpp
InterpreterCreateQuery.cpp
@ -146,6 +147,7 @@ SRCS(
RewriteSumIfFunctionVisitor.cpp
RowRefs.cpp
SelectIntersectExceptQueryVisitor.cpp
Session.cpp
Set.cpp
SetVariants.cpp
SortedBlocksWriter.cpp

View File

@ -11,6 +11,7 @@ PEERDIR(
SRCS(
ASTAlterQuery.cpp
ASTAsterisk.cpp
ASTBackupQuery.cpp
ASTColumnDeclaration.cpp
ASTColumnsMatcher.cpp
ASTColumnsTransformers.cpp
@ -51,6 +52,7 @@ SRCS(
ASTRolesOrUsersSet.cpp
ASTRowPolicyName.cpp
ASTSampleRatio.cpp
ASTSelectIntersectExceptQuery.cpp
ASTSelectQuery.cpp
ASTSelectWithUnionQuery.cpp
ASTSetQuery.cpp
@ -89,6 +91,7 @@ SRCS(
MySQL/ASTDeclareSubPartition.cpp
MySQL/ASTDeclareTableOptions.cpp
ParserAlterQuery.cpp
ParserBackupQuery.cpp
ParserCase.cpp
ParserCheckQuery.cpp
ParserCreateFunctionQuery.cpp
@ -142,6 +145,7 @@ SRCS(
TokenIterator.cpp
formatAST.cpp
formatSettingName.cpp
getInsertQuery.cpp
iostream_debug_helpers.cpp
makeASTForLogicalFunction.cpp
obfuscateQueries.cpp

View File

@ -8,7 +8,7 @@ PEERDIR(
SRCS(
<? find . -name '*.cpp' | grep -v -F New | grep -v -F tests | grep -v -F examples | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F New | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -34,6 +34,35 @@ void JSONAsStringRowInputFormat::resetParser()
buf.reset();
}
void JSONAsStringRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == '[')
{
++buf.position();
data_in_square_brackets = true;
}
}
void JSONAsStringRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(buf);
if (data_in_square_brackets)
{
assertChar(']', buf);
skipWhitespaceIfAny(buf);
}
if (!buf.eof() && *buf.position() == ';')
{
++buf.position();
skipWhitespaceIfAny(buf);
}
assertEOF(buf);
}
void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
@ -113,7 +142,23 @@ void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(buf);
if (!buf.eof())
{
if (!data_in_square_brackets && *buf.position() == ';')
{
/// ';' means the end of query, but it cannot be before ']'.
return allow_new_rows = false;
}
else if (data_in_square_brackets && *buf.position() == ']')
{
/// ']' means the end of query.
return allow_new_rows = false;
}
}
if (!buf.eof())
readJSONObject(*columns[0]);
@ -143,4 +188,9 @@ void registerFileSegmentationEngineJSONAsString(FormatFactory & factory)
factory.registerFileSegmentationEngine("JSONAsString", &fileSegmentationEngineJSONEachRowImpl);
}
void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory)
{
factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsString", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
}

View File

@ -22,10 +22,17 @@ public:
String getName() const override { return "JSONAsStringRowInputFormat"; }
void resetParser() override;
void readPrefix() override;
void readSuffix() override;
private:
void readJSONObject(IColumn & column);
PeekableReadBuffer buf;
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;
bool allow_new_rows = true;
};
}

View File

@ -359,4 +359,10 @@ void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
factory.registerFileSegmentationEngine("JSONStringsEachRow", &fileSegmentationEngineJSONEachRowImpl);
}
void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory)
{
factory.registerNonTrivialPrefixAndSuffixChecker("JSONEachRow", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
factory.registerNonTrivialPrefixAndSuffixChecker("JSONStringsEachRow", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
}

View File

@ -6,7 +6,7 @@ namespace DB
{
SinkToOutputStream::SinkToOutputStream(BlockOutputStreamPtr stream_)
: ISink(stream_->getHeader())
: SinkToStorage(stream_->getHeader())
, stream(std::move(stream_))
{
stream->writePrefix();

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISink.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
@ -9,7 +9,7 @@ using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
/// Sink which writes data to IBlockOutputStream.
/// It's a temporary wrapper.
class SinkToOutputStream : public ISink
class SinkToOutputStream : public SinkToStorage
{
public:
explicit SinkToOutputStream(BlockOutputStreamPtr stream);

View File

@ -118,6 +118,7 @@ SRCS(
QueryPlan/IQueryPlanStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/IntersectOrExceptStep.cpp
QueryPlan/JoinStep.cpp
QueryPlan/LimitByStep.cpp
QueryPlan/LimitStep.cpp
@ -165,6 +166,7 @@ SRCS(
Transforms/FillingTransform.cpp
Transforms/FilterTransform.cpp
Transforms/FinishSortingTransform.cpp
Transforms/IntersectOrExceptTransform.cpp
Transforms/JoiningTransform.cpp
Transforms/LimitByTransform.cpp
Transforms/LimitsCheckingTransform.cpp

View File

@ -16,7 +16,7 @@ ADDINCL(
CFLAGS(-DUSE_ARROW=1)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -P 'Avro|ORC|Parquet|CapnProto' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'Avro|ORC|Parquet|CapnProto' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -123,6 +123,12 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
if (host.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
// Shall set env LIBHDFS3_CONF *before* HDFSBuilderWrapper construction.
const String & libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", "");
if (!libhdfs3_conf.empty())
{
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), 1);
}
HDFSBuilderWrapper builder;
if (builder.get() == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " +

Some files were not shown because too many files have changed in this diff Show More