Merge remote-tracking branch 'upstream/master' into replxx

This commit is contained in:
Ivan Lezhankin 2020-01-21 17:25:09 +03:00
commit 482a0b1c1a
48 changed files with 1367 additions and 178 deletions

View File

@ -49,4 +49,9 @@ if (ENABLE_REPLXX)
target_compile_definitions(replxx PUBLIC USE_REPLXX=1)
message (STATUS "Using replxx")
else ()
add_library(replxx INTERFACE)
target_compile_definitions(replxx PUBLIC USE_REPLXX=0)
message (STATUS "Not using replxx (Beware! Runtime fallback to readline is possible!)")
endif ()

View File

@ -606,7 +606,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
/// If exception was thrown during pipeline execution, skip it while processing other exception.
}
pipeline = QueryPipeline()
/// pipeline = QueryPipeline()
);
while (true)

View File

@ -112,7 +112,7 @@ void FileChecker::save() const
out->next();
}
disk->moveFile(tmp_files_info_path, files_info_path);
disk->replaceFile(tmp_files_info_path, files_info_path);
}
void FileChecker::load(Map & local_map, const String & path) const

View File

@ -15,7 +15,57 @@ namespace ErrorCodes
extern const int PATH_ACCESS_DENIED;
}
std::mutex DiskLocal::mutex;
std::mutex DiskLocal::reservation_mutex;
using DiskLocalPtr = std::shared_ptr<DiskLocal>;
class DiskLocalReservation : public IReservation
{
public:
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
void update(UInt64 new_size) override;
~DiskLocalReservation() override;
private:
DiskLocalPtr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) :
dir_path(dir_path_), iter(disk_path_ + dir_path_) {}
void next() override { ++iter; }
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
String path() const override
{
if (iter->isDirectory())
return dir_path + iter.name() + '/';
else
return dir_path + iter.name();
}
private:
String dir_path;
Poco::DirectoryIterator iter;
};
ReservationPtr DiskLocal::reserve(UInt64 bytes)
{
@ -26,7 +76,7 @@ ReservationPtr DiskLocal::reserve(UInt64 bytes)
bool DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(mutex);
std::lock_guard lock(DiskLocal::reservation_mutex);
if (bytes == 0)
{
LOG_DEBUG(&Logger::get("DiskLocal"), "Reserving 0 bytes on disk " << backQuote(name));
@ -71,7 +121,7 @@ UInt64 DiskLocal::getAvailableSpace() const
UInt64 DiskLocal::getUnreservedSpace() const
{
std::lock_guard lock(mutex);
std::lock_guard lock(DiskLocal::reservation_mutex);
auto available_space = getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
@ -161,20 +211,31 @@ std::unique_ptr<WriteBuffer> DiskLocal::writeFile(const String & path, size_t bu
return std::make_unique<WriteBufferFromFile>(disk_path + path, buf_size, flags);
}
void DiskLocal::remove(const String & path)
{
Poco::File(disk_path + path).remove(false);
}
void DiskLocal::removeRecursive(const String & path)
{
Poco::File(disk_path + path).remove(true);
}
void DiskLocalReservation::update(UInt64 new_size)
{
std::lock_guard lock(DiskLocal::mutex);
std::lock_guard lock(DiskLocal::reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
DiskLocalReservation::~DiskLocalReservation()
{
try
{
std::lock_guard lock(DiskLocal::mutex);
std::lock_guard lock(DiskLocal::reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;

View File

@ -4,7 +4,6 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <mutex>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
@ -71,6 +70,10 @@ public:
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
private:
bool tryReserve(UInt64 bytes);
@ -79,61 +82,10 @@ private:
const String disk_path;
const UInt64 keep_free_space_bytes;
/// Used for reservation counters modification
static std::mutex mutex;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
static std::mutex reservation_mutex;
};
using DiskLocalPtr = std::shared_ptr<DiskLocal>;
class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) :
dir_path(dir_path_), iter(disk_path_ + dir_path_) {}
void next() override { ++iter; }
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
String path() const override
{
if (iter->isDirectory())
return dir_path + iter.name() + '/';
else
return dir_path + iter.name();
}
private:
String dir_path;
Poco::DirectoryIterator iter;
};
class DiskLocalReservation : public IReservation
{
public:
DiskLocalReservation(const DiskLocalPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
void update(UInt64 new_size) override;
~DiskLocalReservation() override;
private:
DiskLocalPtr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
class DiskFactory;
void registerDiskLocal(DiskFactory & factory);
}

View File

@ -16,6 +16,27 @@ namespace ErrorCodes
extern const int CANNOT_DELETE_DIRECTORY;
}
class DiskMemoryDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<String> && dir_file_paths_)
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin())
{
}
void next() override { ++iter; }
bool isValid() const override { return iter != dir_file_paths.end(); }
String path() const override { return *iter; }
private:
std::vector<String> dir_file_paths;
std::vector<String>::iterator iter;
};
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{
throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
@ -71,7 +92,7 @@ size_t DiskMemory::getFileSize(const String & path) const
auto iter = files.find(path);
if (iter == files.end())
throw Exception("File " + path + " does not exist", ErrorCodes::FILE_DOESNT_EXIST);
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return iter->second.data.size();
}
@ -86,7 +107,7 @@ void DiskMemory::createDirectory(const String & path)
String parent_path = parentPath(path);
if (!parent_path.empty() && files.find(parent_path) == files.end())
throw Exception(
"Failed to create directory " + path + ". Parent directory " + parent_path + " does not exist",
"Failed to create directory '" + path + "'. Parent directory " + parent_path + " does not exist",
ErrorCodes::DIRECTORY_DOESNT_EXIST);
files.emplace(path, FileData{FileType::Directory});
@ -116,7 +137,7 @@ void DiskMemory::clearDirectory(const String & path)
std::lock_guard lock(mutex);
if (files.find(path) == files.end())
throw Exception("Directory " + path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
for (auto iter = files.begin(); iter != files.end();)
{
@ -128,7 +149,7 @@ void DiskMemory::clearDirectory(const String & path)
if (iter->second.type == FileType::Directory)
throw Exception(
"Failed to clear directory " + path + ". " + iter->first + " is a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
"Failed to clear directory '" + path + "'. " + iter->first + " is a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
files.erase(iter++);
}
@ -144,7 +165,7 @@ DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
std::lock_guard lock(mutex);
if (!path.empty() && files.find(path) == files.end())
throw Exception("Directory " + path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
std::vector<String> dir_file_paths;
for (const auto & file : files)
@ -203,7 +224,7 @@ std::unique_ptr<ReadBuffer> DiskMemory::readFile(const String & path, size_t /*b
auto iter = files.find(path);
if (iter == files.end())
throw Exception("File " + path + " does not exist", ErrorCodes::FILE_DOESNT_EXIST);
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return std::make_unique<ReadBufferFromString>(iter->second.data);
}
@ -218,7 +239,7 @@ std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /
String parent_path = parentPath(path);
if (!parent_path.empty() && files.find(parent_path) == files.end())
throw Exception(
"Failed to create file " + path + ". Directory " + parent_path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
"Failed to create file '" + path + "'. Directory " + parent_path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
iter = files.emplace(path, FileData{FileType::File}).first;
}
@ -229,6 +250,46 @@ std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /
return std::make_unique<WriteBufferFromString>(iter->second.data);
}
void DiskMemory::remove(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
if (file_it->second.type == FileType::Directory)
{
files.erase(file_it);
if (std::any_of(files.begin(), files.end(), [path](const auto & file) { return parentPath(file.first) == path; }))
throw Exception("Directory '" + path + "' is not empty", ErrorCodes::CANNOT_DELETE_DIRECTORY);
}
else
{
files.erase(file_it);
}
}
void DiskMemory::removeRecursive(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
for (auto iter = files.begin(); iter != files.end();)
{
if (iter->first.size() >= path.size() && std::string_view(iter->first.data(), path.size()) == path)
iter = files.erase(iter);
else
++iter;
}
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
void registerDiskMemory(DiskFactory & factory)
{

View File

@ -67,6 +67,10 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);
@ -93,30 +97,4 @@ private:
mutable std::mutex mutex;
};
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
class DiskMemoryDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<String> && dir_file_paths_)
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin())
{
}
void next() override { ++iter; }
bool isValid() const override { return iter != dir_file_paths.end(); }
String path() const override { return *iter; }
private:
std::vector<String> dir_file_paths;
std::vector<String>::iterator iter;
};
class DiskFactory;
void registerDiskMemory(DiskFactory & factory);
}

439
dbms/src/Disks/DiskS3.cpp Normal file
View File

@ -0,0 +1,439 @@
#include "DiskS3.h"
#if USE_AWS_S3
# include "DiskFactory.h"
# include <random>
# include <IO/S3Common.h>
# include <IO/ReadBufferFromS3.h>
# include <IO/WriteBufferFromS3.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <Poco/File.h>
# include <Common/checkStackSize.h>
# include <Common/quoteString.h>
# include <Common/thread_local_rng.h>
# include <aws/s3/model/CopyObjectRequest.h>
# include <aws/s3/model/DeleteObjectRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_ALREADY_EXISTS;
extern const int FILE_DOESNT_EXIST;
extern const int PATH_ACCESS_DENIED;
}
namespace
{
template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> && response)
{
if (!response.IsSuccess())
{
const auto & err = response.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
}
}
String readKeyFromFile(const String & path)
{
String key;
ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */
readStringUntilEOF(key, buf);
return key;
}
void writeKeyToFile(const String & key, const String & path)
{
WriteBufferFromFile buf(path, 1024);
writeString(key, buf);
buf.next();
}
/// Stores data in S3 and the object key in file in local filesystem.
class WriteIndirectBufferFromS3 : public WriteBufferFromS3
{
public:
WriteIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
const String & bucket_,
const String & metadata_path_,
const String & s3_path_,
size_t buf_size_)
: WriteBufferFromS3(client_ptr_, bucket_, s3_path_, DEFAULT_BLOCK_SIZE, buf_size_)
, metadata_path(metadata_path_)
, s3_path(s3_path_)
{
}
void finalize() override
{
WriteBufferFromS3::finalize();
writeKeyToFile(s3_path, metadata_path);
finalized = true;
}
~WriteIndirectBufferFromS3() override
{
if (finalized)
return;
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
private:
bool finalized = false;
const String metadata_path;
const String s3_path;
};
}
class DiskS3DirectoryIterator : public IDiskDirectoryIterator
{
public:
DiskS3DirectoryIterator(const String & full_path, const String & folder_path_) : iter(full_path), folder_path(folder_path_) {}
void next() override { ++iter; }
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
String path() const override
{
if (iter->isDirectory())
return folder_path + iter.name() + '/';
else
return folder_path + iter.name();
}
private:
Poco::DirectoryIterator iter;
String folder_path;
};
using DiskS3Ptr = std::shared_ptr<DiskS3>;
class DiskS3Reservation : public IReservation
{
public:
DiskS3Reservation(const DiskS3Ptr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
void update(UInt64 new_size) override
{
std::lock_guard lock(disk->reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
~DiskS3Reservation() override;
private:
DiskS3Ptr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
DiskS3::DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_, String metadata_path_)
: name(std::move(name_))
, client(std::move(client_))
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
{
}
ReservationPtr DiskS3::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
return {};
return std::make_unique<DiskS3Reservation>(std::static_pointer_cast<DiskS3>(shared_from_this()), bytes);
}
bool DiskS3::exists(const String & path) const
{
return Poco::File(metadata_path + path).exists();
}
bool DiskS3::isFile(const String & path) const
{
return Poco::File(metadata_path + path).isFile();
}
bool DiskS3::isDirectory(const String & path) const
{
return Poco::File(metadata_path + path).isDirectory();
}
size_t DiskS3::getFileSize(const String & path) const
{
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess())
{
auto & err = outcome.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
}
else
{
return outcome.GetResult().GetContentLength();
}
}
void DiskS3::createDirectory(const String & path)
{
Poco::File(metadata_path + path).createDirectory();
}
void DiskS3::createDirectories(const String & path)
{
Poco::File(metadata_path + path).createDirectories();
}
DiskDirectoryIteratorPtr DiskS3::iterateDirectory(const String & path)
{
return std::make_unique<DiskS3DirectoryIterator>(metadata_path + path, path);
}
void DiskS3::clearDirectory(const String & path)
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
if (isFile(it->path()))
remove(it->path());
}
void DiskS3::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
throw Exception("File already exists " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path);
}
void DiskS3::replaceFile(const String & from_path, const String & to_path)
{
Poco::File from_file(metadata_path + from_path);
Poco::File to_file(metadata_path + to_path);
if (to_file.exists())
{
Poco::File tmp_file(metadata_path + to_path + ".old");
to_file.renameTo(tmp_file.path());
from_file.renameTo(metadata_path + to_path);
remove(to_path + ".old");
}
else
from_file.renameTo(to_file.path());
}
void DiskS3::copyFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
remove(to_path);
String s3_from_path = readKeyFromFile(metadata_path + from_path);
String s3_to_path = s3_root_path + getRandomName();
Aws::S3::Model::CopyObjectRequest req;
req.SetBucket(bucket);
req.SetCopySource(s3_from_path);
req.SetKey(s3_to_path);
throwIfError(client->CopyObject(req));
writeKeyToFile(s3_to_path, metadata_path + to_path);
}
std::unique_ptr<ReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromS3>(client, bucket, getS3Path(path), buf_size);
}
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
if (!exists(path) || mode == WriteMode::Rewrite)
{
String new_s3_path = s3_root_path + getRandomName();
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, new_s3_path, buf_size);
}
else
{
auto old_s3_path = getS3Path(path);
ReadBufferFromS3 read_buffer(client, bucket, old_s3_path, buf_size);
auto writeBuffer = std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, old_s3_path, buf_size);
std::vector<char> buffer(buf_size);
while (!read_buffer.eof())
writeBuffer->write(buffer.data(), read_buffer.read(buffer.data(), buf_size));
return writeBuffer;
}
}
void DiskS3::remove(const String & path)
{
Poco::File file(metadata_path + path);
if (file.isFile())
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
throwIfError(client->DeleteObject(request));
}
file.remove();
}
void DiskS3::removeRecursive(const String & path)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
Poco::File file(metadata_path + path);
if (file.isFile())
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
throwIfError(client->DeleteObject(request));
}
else
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
removeRecursive(it->path());
}
file.remove();
}
String DiskS3::getS3Path(const String & path) const
{
if (!exists(path))
throw Exception("File not found: " + path, ErrorCodes::FILE_DOESNT_EXIST);
return readKeyFromFile(metadata_path + path);
}
String DiskS3::getRandomName() const
{
std::uniform_int_distribution<int> distribution('a', 'z');
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
bool DiskS3::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
if (bytes == 0)
{
LOG_DEBUG(&Logger::get("DiskS3"), "Reserving 0 bytes on s3 disk " << backQuote(name));
++reservation_count;
return true;
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(
&Logger::get("DiskS3"),
"Reserving " << formatReadableSizeWithBinarySuffix(bytes) << " on disk " << backQuote(name) << ", having unreserved "
<< formatReadableSizeWithBinarySuffix(unreserved_space) << ".");
++reservation_count;
reserved_bytes += bytes;
return true;
}
return false;
}
DiskS3Reservation::~DiskS3Reservation()
{
try
{
std::lock_guard lock(disk->reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '" + disk->getName() + "'.");
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '" + disk->getName() + "'.");
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
auto client = S3::ClientFactory::instance().create(
uri.endpoint,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
String metadata_path = context.getPath() + "disks/" + name + "/";
auto s3disk = std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path);
/// This code is used only to check access to the corresponding disk.
{
auto file = s3disk->writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
{
auto file = s3disk->readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read accecss to S3 bucket in disk " + name, ErrorCodes::PATH_ACCESS_DENIED);
}
{
s3disk->remove("test_acl");
}
return s3disk;
};
factory.registerDiskType("s3", creator);
}
}
#endif

93
dbms/src/Disks/DiskS3.h Normal file
View File

@ -0,0 +1,93 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3
# include "DiskFactory.h"
# include <aws/s3/S3Client.h>
# include <Poco/DirectoryIterator.h>
namespace DB
{
/**
* Storage for persisting data in S3 and metadata on the local disk.
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
* that contains S3 object key with actual data.
*/
class DiskS3 : public IDisk
{
public:
friend class DiskS3Reservation;
DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_, String metadata_path_);
const String & getName() const override { return name; }
const String & getPath() const override { return s3_root_path; }
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getKeepingFreeSpace() const override { return 0; }
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void remove(const String & path) override;
void removeRecursive(const String & path) override;
private:
String getS3Path(const String & path) const;
String getRandomName() const;
bool tryReserve(UInt64 bytes);
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
const String bucket;
const String s3_root_path;
const String metadata_path;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
};
}
#endif

View File

@ -2,6 +2,7 @@
namespace DB
{
bool IDisk::isDirectoryEmpty(const String & path)
{
return !iterateDirectory(path)->isValid();

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include <Poco/Path.h>
@ -97,7 +98,7 @@ public:
/// Create directory and all parent directories if necessary.
virtual void createDirectories(const String & path) = 0;
/// Remove all files from the directory.
/// Remove all files from the directory. Directories are not removed.
virtual void clearDirectory(const String & path) = 0;
/// Move directory from `from_path` to `to_path`.
@ -125,6 +126,12 @@ public:
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;
/// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty.
virtual void remove(const String & path) = 0;
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
virtual void removeRecursive(const String & path) = 0;
};
using DiskPtr = std::shared_ptr<IDisk>;
@ -151,7 +158,7 @@ public:
/**
* Information about reserved size on particular disk.
*/
class IReservation
class IReservation : boost::noncopyable
{
public:
/// Get reservation size.

View File

@ -1,10 +1,16 @@
#include "DiskFactory.h"
#include "registerDisks.h"
#include "DiskFactory.h"
#include <Common/config.h>
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDiskMemory(DiskFactory & factory);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory);
#endif
void registerDisks()
{
@ -12,6 +18,9 @@ void registerDisks()
registerDiskLocal(factory);
registerDiskMemory(factory);
#if USE_AWS_S3
registerDiskS3(factory);
#endif
}
}

View File

@ -3,5 +3,4 @@
namespace DB
{
void registerDisks();
}

View File

@ -20,6 +20,10 @@ struct BitCountImpl
return __builtin_popcountll(a);
if constexpr (std::is_same_v<A, UInt32> || std::is_same_v<A, Int32> || std::is_unsigned_v<A>)
return __builtin_popcount(a);
if constexpr (std::is_same_v<A, Int16>)
return __builtin_popcount(static_cast<UInt16>(a));
if constexpr (std::is_same_v<A, Int8>)
return __builtin_popcount(static_cast<UInt8>(a));
else
return __builtin_popcountll(ext::bit_cast<unsigned long long>(a));
}

View File

@ -93,11 +93,10 @@ namespace S3
if (!endpoint.empty())
cfg.endpointOverride = endpoint;
auto cred_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(access_key_id,
secret_access_key);
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
return std::make_shared<Aws::S3::S3Client>(
std::move(cred_provider), // Credentials provider.
credentials, // Aws credentials.
std::move(cfg), // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, // Sign policy.
endpoint.empty() // Use virtual addressing only if endpoint is not specified.
@ -105,7 +104,7 @@ namespace S3
}
URI::URI(Poco::URI & uri_)
URI::URI(const Poco::URI & uri_)
{
static const std::regex BUCKET_KEY_PATTERN("([^/]+)/(.*)");

View File

@ -49,7 +49,7 @@ struct URI
String bucket;
String key;
explicit URI (Poco::URI & uri_);
explicit URI(const Poco::URI & uri_);
};
}

View File

@ -69,6 +69,7 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::finalize()
{
next();
temporary_buffer->finalize();
if (!buffer_string.empty())
{

View File

@ -781,7 +781,8 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
size_t bucket) const
size_t bucket,
std::atomic<bool> * is_cancelled) const
{
auto & merged_data = *variants[0];
auto method = merged_data.type;
@ -792,6 +793,8 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \
return {}; \
block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \
}
@ -1482,12 +1485,15 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena, std::atomic<bool> * is_cancelled) const
{
/// We merge all aggregation results to the first.
AggregatedDataVariantsPtr & res = data[0];
for (size_t result_num = 1, size = data.size(); result_num < size; ++result_num)
{
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst))
return;
AggregatedDataVariants & current = *data[result_num];
mergeDataImpl<Method>(

View File

@ -1170,7 +1170,8 @@ protected:
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
size_t bucket) const;
size_t bucket,
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
@ -1206,7 +1207,7 @@ protected:
template <typename Method>
void mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const;
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena, std::atomic<bool> * is_cancelled = nullptr) const;
template <typename Method>
void convertBlockToTwoLevelImpl(

View File

@ -1949,7 +1949,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
if (pipeline.getNumStreams() > 1)
{
/// Add resize transform to uniformly distribute data between aggregating streams.
pipeline.resize(pipeline.getNumStreams(), true);
if (!(storage && storage->hasEvenlyDistributedRead()))
pipeline.resize(pipeline.getNumStreams(), true, true);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
auto merge_threads = settings.aggregation_memory_efficient_merge_threads

View File

@ -259,7 +259,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
/// In this method we have ownership on node.
auto & node = graph[pid];
bool need_traverse = false;
bool need_expand_pipeline = false;
std::vector<Edge *> updated_back_edges;
@ -290,13 +289,11 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
need_traverse = true;
node.status = ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
need_traverse = true;
node.status = ExecStatus::Finished;
break;
}
@ -325,7 +322,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
}
}
if (need_traverse)
{
for (auto & edge_id : node.post_updated_input_ports)
{
@ -346,7 +342,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
}
}
if (need_traverse)
{
for (auto & edge : updated_direct_edges)
{
@ -543,7 +538,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!task_queue.empty() && !threads_queue.empty() /*&& task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = threads_queue.pop_any();
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
}
@ -627,9 +628,15 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
queue.pop();
}
if (!threads_queue.empty() /* && task_queue.quota() > threads_queue.size()*/)
if (!threads_queue.empty() && !finished /* && task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = threads_queue.pop_any();
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);

View File

@ -149,32 +149,37 @@ private:
++quota_;
}
ExecutionState * pop(size_t thread_num)
size_t getAnyThreadWithTasks(size_t from_thread = 0)
{
if (size_ == 0)
throw Exception("TaskQueue is not empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < queues.size(); ++i)
{
if (!queues[thread_num].empty())
{
ExecutionState * state = queues[thread_num].front();
queues[thread_num].pop();
if (!queues[from_thread].empty())
return from_thread;
--size_;
if (state->has_quota)
++quota_;
return state;
}
++thread_num;
if (thread_num >= queues.size())
thread_num = 0;
++from_thread;
if (from_thread >= queues.size())
from_thread = 0;
}
throw Exception("TaskQueue is not empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
}
ExecutionState * pop(size_t thread_num)
{
auto thread_with_tasks = getAnyThreadWithTasks(thread_num);
ExecutionState * state = queues[thread_with_tasks].front();
queues[thread_with_tasks].pop();
--size_;
if (state->has_quota)
++quota_;
return state;
}
size_t size() const { return size_; }

View File

@ -33,6 +33,10 @@ ISimpleTransform::Status ISimpleTransform::prepare()
{
output.pushData(std::move(current_data));
transformed = false;
if (!no_more_data_needed)
return Status::PortFull;
}
/// Stop if don't need more data.
@ -52,12 +56,13 @@ ISimpleTransform::Status ISimpleTransform::prepare()
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_data = input.pullData();
current_data = input.pullData(true);
has_input = true;
if (current_data.exception)

View File

@ -161,12 +161,17 @@ protected:
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
}
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags)
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags, bool set_not_needed = false)
{
flags = data_.swap(data, 0, HAS_DATA);
uintptr_t mask = HAS_DATA;
if (set_not_needed)
mask |= IS_NEEDED;
flags = data_.swap(data, 0, mask);
/// It's ok to check because this flag can be changed only by pulling thread.
if (unlikely((flags & IS_NEEDED) == 0))
if (unlikely((flags & IS_NEEDED) == 0) && !set_not_needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (unlikely((flags & HAS_DATA) == 0))
@ -266,14 +271,15 @@ private:
public:
using Port::Port;
Data ALWAYS_INLINE pullData()
Data ALWAYS_INLINE pullData(bool set_not_needed = false)
{
updateVersion();
if (!set_not_needed)
updateVersion();
assumeConnected();
std::uintptr_t flags = 0;
state->pull(data, flags);
state->pull(data, flags, set_not_needed);
is_finished = flags & State::IS_FINISHED;
@ -293,9 +299,9 @@ public:
return std::move(*data);
}
Chunk ALWAYS_INLINE pull()
Chunk ALWAYS_INLINE pull(bool set_not_needed = false)
{
auto data_ = pullData();
auto data_ = pullData(set_not_needed);
if (data_.exception)
std::rethrow_exception(data_.exception);

View File

@ -234,7 +234,7 @@ void QueryPipeline::addDelayedStream(ProcessorPtr source)
addPipe({ std::move(processor) });
}
void QueryPipeline::resize(size_t num_streams, bool force)
void QueryPipeline::resize(size_t num_streams, bool force, bool strict)
{
checkInitialized();
@ -243,7 +243,13 @@ void QueryPipeline::resize(size_t num_streams, bool force)
has_resize = true;
auto resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams);
ProcessorPtr resize;
if (strict)
resize = std::make_shared<StrictResizeProcessor>(current_header, getNumStreams(), num_streams);
else
resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams);
auto stream = streams.begin();
for (auto & input : resize->getInputs())
connect(**(stream++), input);

View File

@ -61,7 +61,7 @@ public:
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
void resize(size_t num_streams, bool force = false);
void resize(size_t num_streams, bool force = false, bool strict = false);
void enableQuotaForCurrentStreams();

View File

@ -1,5 +1,5 @@
#include <Processors/ResizeProcessor.h>
#include <iostream>
namespace DB
{
@ -257,5 +257,143 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
return Status::PortFull;
}
IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
if (!initialized)
{
initialized = true;
for (auto & input : inputs)
input_ports.push_back({.port = &input, .status = InputStatus::NotActive, .waiting_output = -1});
for (UInt64 i = 0; i < input_ports.size(); ++i)
disabled_input_ports.push(i);
for (auto & output : outputs)
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
}
for (auto & output_number : updated_outputs)
{
auto & output = output_ports[output_number];
if (output.port->isFinished())
{
if (output.status != OutputStatus::Finished)
{
++num_finished_outputs;
output.status = OutputStatus::Finished;
}
continue;
}
if (output.port->canPush())
{
if (output.status != OutputStatus::NeedData)
{
output.status = OutputStatus::NeedData;
waiting_outputs.push(output_number);
}
}
}
if (num_finished_outputs == outputs.size())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
std::queue<UInt64> inputs_with_data;
for (auto & input_number : updated_inputs)
{
auto & input = input_ports[input_number];
if (input.port->isFinished())
{
if (input.status != InputStatus::Finished)
{
input.status = InputStatus::Finished;
++num_finished_inputs;
waiting_outputs.push(input.waiting_output);
}
continue;
}
if (input.port->hasData())
{
if (input.status != InputStatus::NotActive)
{
input.status = InputStatus::NotActive;
inputs_with_data.push(input_number);
}
}
}
while (!inputs_with_data.empty())
{
auto input_number = inputs_with_data.front();
auto & input_with_data = input_ports[input_number];
inputs_with_data.pop();
if (input_with_data.waiting_output == -1)
throw Exception("No associated output for input with data.", ErrorCodes::LOGICAL_ERROR);
auto & waiting_output = output_ports[input_with_data.waiting_output];
if (waiting_output.status != OutputStatus::NeedData)
throw Exception("Invalid status for associated output.", ErrorCodes::LOGICAL_ERROR);
waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_deeded = */ true));
waiting_output.status = OutputStatus::NotActive;
if (input_with_data.port->isFinished())
{
input_with_data.status = InputStatus::Finished;
++num_finished_inputs;
}
else
disabled_input_ports.push(input_number);
}
if (num_finished_inputs == inputs.size())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
/// Enable more inputs if needed.
while (!disabled_input_ports.empty() && !waiting_outputs.empty())
{
auto & input = input_ports[disabled_input_ports.front()];
disabled_input_ports.pop();
input.port->setNeeded();
input.status = InputStatus::NeedData;
input.waiting_output = waiting_outputs.front();
waiting_outputs.pop();
}
while (!waiting_outputs.empty())
{
auto & output = output_ports[waiting_outputs.front()];
waiting_outputs.pop();
output.status = OutputStatus::Finished;
output.port->finish();
++num_finished_outputs;
}
if (disabled_input_ports.empty())
return Status::NeedData;
return Status::PortFull;
}
}

View File

@ -74,4 +74,60 @@ private:
std::vector<OutputPortWithStatus> output_ports;
};
class StrictResizeProcessor : public IProcessor
{
public:
/// TODO Check that there is non zero number of inputs and outputs.
StrictResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header))
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;
private:
InputPorts::iterator current_input;
OutputPorts::iterator current_output;
size_t num_finished_inputs = 0;
size_t num_finished_outputs = 0;
std::queue<UInt64> disabled_input_ports;
std::queue<UInt64> waiting_outputs;
bool initialized = false;
enum class OutputStatus
{
NotActive,
NeedData,
Finished,
};
enum class InputStatus
{
NotActive,
NeedData,
Finished,
};
struct InputPortWithStatus
{
InputPort * port;
InputStatus status;
ssize_t waiting_output;
};
struct OutputPortWithStatus
{
OutputPort * port;
OutputStatus status;
};
std::vector<InputPortWithStatus> input_ports;
std::vector<OutputPortWithStatus> output_ports;
};
}

View File

@ -78,6 +78,7 @@ public:
{
std::atomic<UInt32> next_bucket_to_merge = 0;
std::array<std::atomic<Int32>, NUM_BUCKETS> source_for_bucket;
std::atomic<bool> is_cancelled = false;
SharedData()
{
@ -112,7 +113,7 @@ protected:
if (bucket_num >= NUM_BUCKETS)
return {};
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num);
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled);
Chunk chunk = convertToChunk(block);
shared_data->source_for_bucket[bucket_num] = source_number;
@ -201,6 +202,9 @@ public:
for (auto & input : inputs)
input.close();
if (shared_data)
shared_data->is_cancelled.store(true);
return Status::Finished;
}
@ -429,11 +433,16 @@ IProcessor::Status AggregatingTransform::prepare()
}
}
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull();
if (is_consume_finished)
input.setNeeded();
current_chunk = input.pull(/*set_not_needed = */ !is_consume_finished);
read_current_chunk = true;
if (is_consume_finished)

View File

@ -74,16 +74,8 @@ IProcessor::Status MergingSortedTransform::prepare()
return Status::Finished;
}
if (!output.isNeeded())
{
for (auto & in : inputs)
in.setNotNeeded();
return Status::PortFull;
}
if (output.hasData())
return Status::PortFull;
/// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before.
bool is_port_full = !output.canPush();
/// Special case for single input.
if (inputs.size() == 1)
@ -96,14 +88,20 @@ IProcessor::Status MergingSortedTransform::prepare()
}
input.setNeeded();
if (input.hasData())
output.push(input.pull());
{
if (!is_port_full)
output.push(input.pull());
return Status::PortFull;
}
return Status::NeedData;
}
/// Push if has data.
if (merged_data.mergedRows())
if (merged_data.mergedRows() && !is_port_full)
output.push(merged_data.pull());
if (!is_initialized)
@ -119,7 +117,7 @@ IProcessor::Status MergingSortedTransform::prepare()
if (!cursors[i].empty())
{
input.setNotNeeded();
// input.setNotNeeded();
continue;
}
@ -159,6 +157,10 @@ IProcessor::Status MergingSortedTransform::prepare()
{
if (is_finished)
{
if (is_port_full)
return Status::PortFull;
for (auto & input : inputs)
input.close();
@ -192,6 +194,9 @@ IProcessor::Status MergingSortedTransform::prepare()
need_data = false;
}
if (is_port_full)
return Status::PortFull;
return Status::Ready;
}
}

View File

@ -241,12 +241,13 @@ IProcessor::Status SortingTransform::prepareConsume()
if (input.isFinished())
return Status::Finished;
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull();
current_chunk = input.pull(true);
}
/// Now consume.

View File

@ -114,6 +114,8 @@ public:
/// Returns true if the blocks shouldn't be pushed to associated views on insert.
virtual bool noPushingToViews() const { return false; }
virtual bool hasEvenlyDistributedRead() const { return false; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;

View File

@ -425,6 +425,12 @@ void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWri
addFiles(column.name, *column.type);
}
void StorageTinyLog::drop(TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->removeRecursive(table_path);
files.clear();
}
void registerStorageTinyLog(StorageFactory & factory)
{

View File

@ -46,6 +46,8 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void drop(TableStructureWriteLockHolder &) override;
protected:
StorageTinyLog(
DiskPtr disk_,

View File

@ -37,6 +37,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool hasEvenlyDistributedRead() const override { return true; }
private:
bool multithreaded;
bool even_distribution;

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<storage_configuration>
<disks>
<default>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</default>
</disks>
</storage_configuration>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,49 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
minio_client = cluster.minio_client
if minio_client.bucket_exists(cluster.minio_bucket):
minio_client.remove_bucket(cluster.minio_bucket)
minio_client.make_bucket(cluster.minio_bucket)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", config_dir="configs", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
yield cluster
finally:
cluster.shutdown()
def test_tinylog_s3(cluster):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("CREATE TABLE s3_test (id UInt64) Engine=TinyLog")
node.query("INSERT INTO s3_test SELECT number FROM numbers(3)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2
node.query("INSERT INTO s3_test SELECT number + 3 FROM numbers(3)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n5\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2
node.query("DROP TABLE s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0

View File

@ -0,0 +1,16 @@
-- unmerged state
1 0000-00-00 00:00:00 0000-00-00 00:00:00 18446744073709551615 0
1 0000-00-00 00:00:00 2000-01-01 10:00:00 7200 0
1 0000-00-00 00:00:00 2000-01-01 11:10:00 600 0
1 2000-01-01 08:00:00 0000-00-00 00:00:00 18446744073709551615 0
1 2000-01-01 11:00:00 0000-00-00 00:00:00 18446744073709551615 3600
2 0000-00-00 00:00:00 0000-00-00 00:00:00 18446744073709551615 0
2 0000-00-00 00:00:00 2000-01-01 10:00:00 7200 0
2 0000-00-00 00:00:00 2000-01-01 11:10:00 600 0
2 0000-00-00 00:00:00 2001-01-01 11:10:02 1 0
2 2000-01-01 08:00:00 0000-00-00 00:00:00 18446744073709551615 0
2 2000-01-01 11:00:00 0000-00-00 00:00:00 18446744073709551615 3600
2 2001-01-01 11:10:01 0000-00-00 00:00:00 18446744073709551615 31622401
-- merged state
1 2000-01-01 11:00:00 2000-01-01 11:10:00 600 3600
2 2001-01-01 11:10:01 2001-01-01 11:10:02 1 31622401

View File

@ -0,0 +1,127 @@
SYSTEM STOP MERGES;
-- incremental streaming usecase
-- that has sense only if data filling order has guarantees of chronological order
DROP TABLE IF EXISTS target_table;
DROP TABLE IF EXISTS logins;
DROP TABLE IF EXISTS mv_logins2target;
DROP TABLE IF EXISTS checkouts;
DROP TABLE IF EXISTS mv_checkouts2target;
-- that is the final table, which is filled incrementally from 2 different sources
CREATE TABLE target_table Engine=SummingMergeTree() ORDER BY id
AS
SELECT
number as id,
maxState( toDateTime(0) ) as latest_login_time,
maxState( toDateTime(0) ) as latest_checkout_time,
minState( toUInt64(-1) ) as fastest_session,
maxState( toUInt64(0) ) as biggest_inactivity_period
FROM numbers(50000)
GROUP BY id;
-- source table #1
CREATE TABLE logins (
id UInt64,
ts DateTime
) Engine=MergeTree ORDER BY id;
-- and mv with something like feedback from target table
CREATE MATERIALIZED VIEW mv_logins2target TO target_table
AS
SELECT
id,
maxState( ts ) as latest_login_time,
maxState( toDateTime(0) ) as latest_checkout_time,
minState( toUInt64(-1) ) as fastest_session,
if(max(current_latest_checkout_time) > 0, maxState(toUInt64(ts - current_latest_checkout_time)), maxState( toUInt64(0) ) ) as biggest_inactivity_period
FROM logins
LEFT JOIN (
SELECT
id,
maxMerge(latest_checkout_time) as current_latest_checkout_time
-- normal MV sees only the incoming block, but we need something like feedback here
-- so we do join with target table, the most important thing here is that
-- we extract from target table only row affected by that MV, referencing src table
-- it second time
FROM target_table
WHERE id IN (SELECT id FROM logins)
GROUP BY id
) USING (id)
GROUP BY id;
-- the same for second pipeline
CREATE TABLE checkouts (
id UInt64,
ts DateTime
) Engine=MergeTree ORDER BY id;
CREATE MATERIALIZED VIEW mv_checkouts2target TO target_table
AS
SELECT
id,
maxState( toDateTime(0) ) as latest_login_time,
maxState( ts ) as latest_checkout_time,
if(max(current_latest_login_time) > 0, minState( toUInt64(ts - current_latest_login_time)), minState( toUInt64(-1) ) ) as fastest_session,
maxState( toUInt64(0) ) as biggest_inactivity_period
FROM checkouts
LEFT JOIN (SELECT id, maxMerge(latest_login_time) as current_latest_login_time FROM target_table WHERE id IN (SELECT id FROM checkouts) GROUP BY id) USING (id)
GROUP BY id;
-- feed with some initial values
INSERT INTO logins SELECT number as id, '2000-01-01 08:00:00' from numbers(50000);
INSERT INTO checkouts SELECT number as id, '2000-01-01 10:00:00' from numbers(50000);
-- ensure that we don't read whole target table during join
set max_rows_to_read = 2000;
INSERT INTO logins SELECT number as id, '2000-01-01 11:00:00' from numbers(1000);
INSERT INTO checkouts SELECT number as id, '2000-01-01 11:10:00' from numbers(1000);
set max_rows_to_read = 10;
INSERT INTO logins SELECT number+2 as id, '2001-01-01 11:10:01' from numbers(1);
INSERT INTO checkouts SELECT number+2 as id, '2001-01-01 11:10:02' from numbers(1);
set max_rows_to_read = 0;
select '-- unmerged state';
select
id,
finalizeAggregation(latest_login_time) as current_latest_login_time,
finalizeAggregation(latest_checkout_time) as current_latest_checkout_time,
finalizeAggregation(fastest_session) as current_fastest_session,
finalizeAggregation(biggest_inactivity_period) as current_biggest_inactivity_period
from target_table
where id in (1,2)
ORDER BY id, current_latest_login_time, current_latest_checkout_time;
select '-- merged state';
SELECT
id,
maxMerge(latest_login_time) as current_latest_login_time,
maxMerge(latest_checkout_time) as current_latest_checkout_time,
minMerge(fastest_session) as current_fastest_session,
maxMerge(biggest_inactivity_period) as current_biggest_inactivity_period
FROM target_table
where id in (1,2)
GROUP BY id
ORDER BY id;
DROP TABLE IF EXISTS logins;
DROP TABLE IF EXISTS mv_logins2target;
DROP TABLE IF EXISTS checkouts;
DROP TABLE IF EXISTS mv_checkouts2target;
SYSTEM START MERGES;

View File

@ -0,0 +1,7 @@
create temporary table t1 (a Nullable(UInt8));
insert into t1 values (2.4);
select * from t1;
create temporary table t2 (a UInt8);
insert into t2 values (2.4);
select * from t2;

View File

@ -6,7 +6,7 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
p7zip-full bash git ncdu wget psmisc python3 python3-pip tzdata python3-dev g++ \
p7zip-full bash git moreutils ncdu wget psmisc python3 python3-pip tzdata tree python3-dev g++ \
&& pip3 --no-cache-dir install clickhouse_driver \
&& apt-get purge --yes python3-dev g++ \
&& apt-get autoremove --yes \

View File

@ -23,8 +23,16 @@ function download
la="$left_pr-$left_sha.tgz"
ra="$right_pr-$right_sha.tgz"
wget -q -nd -c "https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/performance/performance.tgz" -O "$la" && tar -C left --strip-components=1 -zxvf "$la" &
wget -q -nd -c "https://clickhouse-builds.s3.yandex.net/$right_pr/$right_sha/performance/performance.tgz" -O "$ra" && tar -C right --strip-components=1 -zxvf "$ra" &
# might have the same version on left and right
if ! [ "$la" = "$ra" ]
then
wget -q -nd -c "https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/performance/performance.tgz" -O "$la" && tar -C left --strip-components=1 -zxvf "$la" &
wget -q -nd -c "https://clickhouse-builds.s3.yandex.net/$right_pr/$right_sha/performance/performance.tgz" -O "$ra" && tar -C right --strip-components=1 -zxvf "$ra" &
else
wget -q -nd -c "https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/performance/performance.tgz" -O "$la" && { tar -C left --strip-components=1 -zxvf "$la" & tar -C right --strip-components=1 -zxvf "$ra" & } &
fi
cd db0 && wget -q -nd -c "https://s3.mds.yandex.net/clickhouse-private-datasets/hits_10m_single/partitions/hits_10m_single.tar" && tar -xvf hits_10m_single.tar &
cd db0 && wget -q -nd -c "https://s3.mds.yandex.net/clickhouse-private-datasets/hits_100m_single/partitions/hits_100m_single.tar" && tar -xvf hits_100m_single.tar &
cd db0 && wget -q -nd -c "https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_v1.tar" && tar -xvf hits_v1.tar &
@ -119,7 +127,7 @@ function run_tests
"$script_dir/perf.py" --help > /dev/null
# FIXME remove some broken long tests
rm left/performance/IPv* ||:
rm left/performance/{IPv4,IPv6,modulo,parse_engine_file,number_formatting_formats,select_format}.xml ||:
# Run the tests
for test in left/performance/*.xml

View File

@ -1,7 +1,11 @@
#!/bin/bash
set -ex
cd /workspace
chown nobody workspace output
chgrp nogroup workspace output
chmod 777 workspace output
cd workspace
# We will compare to the most recent testing tag in master branch, let's find it.
rm -rf ch ||:
@ -22,7 +26,7 @@ set +e
# It's probably at fault for using `kill 0` as an error handling mechanism,
# but I can't be bothered to change this now.
set -m
../compare.sh 0 $ref_sha $PR_TO_TEST $SHA_TO_TEST 2>&1 | tee compare.log
time ../compare.sh 0 $ref_sha $PR_TO_TEST $SHA_TO_TEST 2>&1 | ts | tee compare.log
set +m
7z a /output/output.7z *.log *.tsv

View File

@ -6,6 +6,13 @@
#include <port/unistd.h>
#include <string.h>
/// We can detect if code is linked with one or another readline variants or open the library dynamically.
#include <dlfcn.h>
extern "C"
{
char * readline(const char *) __attribute__((__weak__));
char * (*readline_ptr)(const char *) = readline;
}
namespace
{
@ -105,10 +112,37 @@ LineReader::InputStatus LineReader::readOneLine(const String & prompt)
{
input.clear();
std::cout << prompt;
std::getline(std::cin, input);
if (!std::cin.good())
return ABORT;
if (!readline_ptr)
{
for (auto name : {"libreadline.so", "libreadline.so.0", "libeditline.so", "libeditline.so.0"})
{
void * dl_handle = dlopen(name, RTLD_LAZY);
if (dl_handle)
{
readline_ptr = reinterpret_cast<char * (*)(const char *)>(dlsym(dl_handle, "readline"));
if (readline_ptr)
{
break;
}
}
}
}
/// Minimal support for readline
if (readline_ptr)
{
char * line_read = (*readline_ptr)(prompt.c_str());
if (!line_read)
return ABORT;
input = line_read;
}
else
{
std::cout << prompt;
std::getline(std::cin, input);
if (!std::cin.good())
return ABORT;
}
trim(input);
return INPUT_LINE;