Merge remote-tracking branch 'upstream/master' into nikvas0/index

This commit is contained in:
Nikita Vasilev 2019-01-22 19:02:20 +03:00
commit d47cd4825d
44 changed files with 334 additions and 201 deletions

View File

@ -2,10 +2,10 @@
set(VERSION_REVISION 54413)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 1)
set(VERSION_PATCH 3)
set(VERSION_GITHASH ac0060079ab221278338db343ca9eaf006fc4ee1)
set(VERSION_DESCRIBE v19.1.3-testing)
set(VERSION_STRING 19.1.3)
set(VERSION_PATCH 4)
set(VERSION_GITHASH be762f58e1d0286c54609301cabc1934f49257fc)
set(VERSION_DESCRIBE v19.1.4-testing)
set(VERSION_STRING 19.1.4)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -25,12 +25,12 @@ namespace ErrorCodes
struct ConnectionParameters
{
String host;
UInt16 port;
UInt16 port{};
String default_database;
String user;
String password;
Protocol::Secure security;
Protocol::Compression compression;
Protocol::Secure security = Protocol::Secure::Disable;
Protocol::Compression compression = Protocol::Compression::Enable;
ConnectionTimeouts timeouts;
ConnectionParameters() {}

View File

@ -93,11 +93,12 @@ private:
{
std::stringstream ss;
ss << hint;
String item;
while (!ss.eof())
{
String item;
ss >> item;
if (item.empty())
if (ss.eof())
break;
if (item == "serverError")

View File

@ -86,7 +86,7 @@ public:
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
events_size = arguments.size();
events_size = static_cast<UInt8>(arguments.size());
}

View File

@ -43,7 +43,7 @@ CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_,
size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(codec_)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_))
{
}

View File

@ -81,11 +81,6 @@ CacheDictionary::CacheDictionary(
createAttributes();
}
CacheDictionary::CacheDictionary(const CacheDictionary & other)
: CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{
}
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{

View File

@ -30,8 +30,6 @@ public:
const DictionaryLifetime dict_lifetime,
const size_t size);
CacheDictionary(const CacheDictionary & other);
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; }
@ -53,7 +51,10 @@ public:
bool isCached() const override { return true; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<CacheDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<CacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, size);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -70,10 +70,6 @@ ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(
createAttributes();
}
ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const ComplexKeyCacheDictionary & other)
: ComplexKeyCacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{
}
void ComplexKeyCacheDictionary::getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const

View File

@ -47,8 +47,6 @@ public:
const DictionaryLifetime dict_lifetime,
const size_t size);
ComplexKeyCacheDictionary(const ComplexKeyCacheDictionary & other);
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return {}; }
@ -76,7 +74,10 @@ public:
bool isCached() const override { return true; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<ComplexKeyCacheDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<ComplexKeyCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, size);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -43,12 +43,6 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
creation_time = std::chrono::system_clock::now();
}
ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other)
: ComplexKeyHashedDictionary{
other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
#define DECLARE(TYPE) \
void ComplexKeyHashedDictionary::get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const \

View File

@ -29,8 +29,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other);
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return creation_exception; }
@ -51,7 +49,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<ComplexKeyHashedDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<ComplexKeyHashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -50,12 +50,6 @@ FlatDictionary::FlatDictionary(
creation_time = std::chrono::system_clock::now();
}
FlatDictionary::FlatDictionary(const FlatDictionary & other)
: FlatDictionary{
other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
void FlatDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{

View File

@ -28,8 +28,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
FlatDictionary(const FlatDictionary & other);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
@ -48,7 +46,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<FlatDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<FlatDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -44,12 +44,6 @@ HashedDictionary::HashedDictionary(
creation_time = std::chrono::system_clock::now();
}
HashedDictionary::HashedDictionary(const HashedDictionary & other)
: HashedDictionary{
other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
void HashedDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{

View File

@ -27,8 +27,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
HashedDictionary(const HashedDictionary & other);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
@ -47,7 +45,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<HashedDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<HashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -3,6 +3,7 @@
#include <vector>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class IDictionarySource;

View File

@ -94,12 +94,6 @@ RangeHashedDictionary::RangeHashedDictionary(
creation_time = std::chrono::system_clock::now();
}
RangeHashedDictionary::RangeHashedDictionary(const RangeHashedDictionary & other)
: RangeHashedDictionary{
other.dictionary_name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
{
}
#define DECLARE_MULTIPLE_GETTER(TYPE) \
void RangeHashedDictionary::get##TYPE( \

View File

@ -24,8 +24,6 @@ public:
const DictionaryLifetime dict_lifetime,
bool require_nonempty);
RangeHashedDictionary(const RangeHashedDictionary & other);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return dictionary_name; }
@ -44,7 +42,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<RangeHashedDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<RangeHashedDictionary>(dictionary_name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -63,11 +63,6 @@ TrieDictionary::TrieDictionary(
creation_time = std::chrono::system_clock::now();
}
TrieDictionary::TrieDictionary(const TrieDictionary & other)
: TrieDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
{
}
TrieDictionary::~TrieDictionary()
{
btrie_destroy(trie);

View File

@ -29,8 +29,6 @@ public:
const DictionaryLifetime dict_lifetime,
bool require_nonempty);
TrieDictionary(const TrieDictionary & other);
~TrieDictionary() override;
std::string getKeyDescription() const { return key_description; }
@ -53,7 +51,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<TrieDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<TrieDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -904,7 +904,7 @@ public:
using T0 = typename Types::LeftType;
using T1 = typename Types::RightType;
if constexpr ((IsDecimalNumber<T0> && IsDecimalNumber<T1>) || (!IsDecimalNumber<T0> && !IsDecimalNumber<T1>))
if constexpr (IsDecimalNumber<T0> == IsDecimalNumber<T1>)
return executeTyped<T0, T1>(cond_col, block, arguments, result, input_rows_count);
else
throw Exception("Conditional function with Decimal and non Decimal", ErrorCodes::NOT_IMPLEMENTED);

View File

@ -104,18 +104,17 @@ String Cluster::Address::readableString() const
return res;
}
void Cluster::Address::fromString(const String & host_port_string, String & host_name, UInt16 & port)
std::pair<String, UInt16> Cluster::Address::fromString(const String & host_port_string)
{
auto pos = host_port_string.find_last_of(':');
if (pos == std::string::npos)
throw Exception("Incorrect <host>:<port> format " + host_port_string, ErrorCodes::SYNTAX_ERROR);
host_name = unescapeForFileName(host_port_string.substr(0, pos));
port = parse<UInt16>(host_port_string.substr(pos + 1));
return {unescapeForFileName(host_port_string.substr(0, pos)), parse<UInt16>(host_port_string.substr(pos + 1))};
}
String Cluster::Address::toStringFull() const
String Cluster::Address::toFullString() const
{
return
escapeForFileName(user) +
@ -126,6 +125,42 @@ String Cluster::Address::toStringFull() const
+ ((secure == Protocol::Secure::Enable) ? "+secure" : "");
}
Cluster::Address Cluster::Address::fromFullString(const String & full_string)
{
const char * address_begin = full_string.data();
const char * address_end = address_begin + full_string.size();
Protocol::Secure secure = Protocol::Secure::Disable;
const char * secure_tag = "+secure";
if (endsWith(full_string, secure_tag))
{
address_end -= strlen(secure_tag);
secure = Protocol::Secure::Enable;
}
const char * user_pw_end = strchr(full_string.data(), '@');
const char * colon = strchr(full_string.data(), ':');
if (!user_pw_end || !colon)
throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR);
const char * has_db = strchr(full_string.data(), '#');
const char * port_end = has_db ? has_db : address_end;
Address address;
address.secure = secure;
address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end));
address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
return address;
}
/// Implementation of Clusters class
@ -202,7 +237,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
const auto weight = config.getInt(prefix + ".weight", default_weight);
addresses.emplace_back(config, prefix);
addresses.back().replica_num = 1;
const auto & address = addresses.back();
ShardInfo info;
@ -257,14 +291,13 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
if (startsWith(replica_key, "replica"))
{
replica_addresses.emplace_back(config, partial_prefix + replica_key);
replica_addresses.back().replica_num = current_replica_num;
++current_replica_num;
if (!replica_addresses.back().is_local)
{
if (internal_replication)
{
auto dir_name = replica_addresses.back().toStringFull();
auto dir_name = replica_addresses.back().toFullString();
if (first)
dir_name_for_internal_replication = dir_name;
else

View File

@ -59,7 +59,6 @@ public:
String password;
/// This database is selected when no database is specified for Distributed table
String default_database;
UInt32 replica_num;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
bool is_local;
bool user_specified = false;
@ -79,10 +78,11 @@ public:
static String toString(const String & host_name, UInt16 port);
static void fromString(const String & host_port_string, String & host_name, UInt16 & port);
static std::pair<String, UInt16> fromString(const String & host_port_string);
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
String toStringFull() const;
String toFullString() const;
static Address fromFullString(const String & address_full_string);
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const
@ -90,6 +90,9 @@ public:
return initially_resolved_address;
}
auto tuple() const { return std::tie(host_name, port, secure, user, password, default_database); }
bool operator==(const Address & other) const { return tuple() == other.tuple(); }
private:
Poco::Net::SocketAddress initially_resolved_address;
};

View File

@ -70,7 +70,7 @@ struct HostID
static HostID fromString(const String & host_port_str)
{
HostID res;
Cluster::Address::fromString(host_port_str, res.host_name, res.port);
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
return res;
}
@ -1076,9 +1076,7 @@ public:
status.tryDeserializeText(status_data);
}
String host;
UInt16 port;
Cluster::Address::fromString(host_id, host, port);
auto [host, port] = Cluster::Address::fromString(host_id);
if (status.code != 0 && first_exception == nullptr)
first_exception = std::make_unique<Exception>("There was an error on [" + host + ":" + toString(port) + "]: " + status.message, status.code);

View File

@ -3,6 +3,7 @@
#include <chrono>
#include <string>
#include <memory>
#include <boost/noncopyable.hpp>
#include <Core/Types.h>
@ -26,7 +27,7 @@ struct ExternalLoadableLifetime final
/// Basic interface for external loadable objects. Is used in ExternalLoader.
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable
{
public:
virtual ~IExternalLoadable() = default;

View File

@ -157,7 +157,7 @@ void RequiredSourceColumnsMatcher::visit(const ASTFunction & node, const ASTPtr
local_aliases.push_back(name);
/// visit child with masked local aliases
visit(node.arguments->children[1], data);
RequiredSourceColumnsVisitor(data).visit(node.arguments->children[1]);
for (const auto & name : local_aliases)
data.private_aliases.erase(name);

View File

@ -354,8 +354,6 @@ std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, s
begin = pos;
ast = parseQueryAndMovePosition(parser, pos, end, "", true, 0);
if (!ast)
break;
ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(ast.get());

View File

@ -240,6 +240,12 @@ CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_
return codec->second;
}
CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name) const
{
return getCodecOrDefault(column_name, CompressionCodecFactory::instance().getDefaultCodec());
}
ColumnsDescription ColumnsDescription::parse(const String & str)
{
ReadBufferFromString buf{str};

View File

@ -69,6 +69,8 @@ struct ColumnsDescription
CompressionCodecPtr getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name) const;
static ColumnsDescription parse(const String & str);
static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table);

View File

@ -48,40 +48,8 @@ namespace
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const auto address = boost::copy_range<std::string>(*it);
const char * address_begin = static_cast<const char*>(address.data());
const char * address_end = address_begin + address.size();
Protocol::Secure secure = Protocol::Secure::Disable;
const char * secure_tag = "+secure";
if (endsWith(address, secure_tag))
{
address_end -= strlen(secure_tag);
secure = Protocol::Secure::Enable;
}
const char * user_pw_end = strchr(address.data(), '@');
const char * colon = strchr(address.data(), ':');
if (!user_pw_end || !colon)
throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port#default_database' pattern",
ErrorCodes::INCORRECT_FILE_NAME};
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception{"Shard address '" + address + "' does not contain port", ErrorCodes::INCORRECT_FILE_NAME};
const char * has_db = strchr(address.data(), '#');
const char * port_end = has_db ? has_db : address_end;
const auto user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
const auto password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
const auto host = unescapeForFileName(std::string(user_pw_end + 1, host_end));
const auto port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
const auto database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end))
: std::string();
pools.emplace_back(factory(host, port, secure, user, password, database));
Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
pools.emplace_back(factory(address));
}
return pools;
@ -175,17 +143,29 @@ void StorageDistributedDirectoryMonitor::run()
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
const auto pool_factory = [&storage, &timeouts] (const std::string & host, const UInt16 port,
const Protocol::Secure secure,
const std::string & user, const std::string & password,
const std::string & default_database)
const auto pool_factory = [&storage, &timeouts] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();
/// existing connections pool have a higher priority
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
{
const Cluster::Address & replica_address = replicas_addresses[replica_index];
if (address == replica_address)
return shards_info[shard_index].per_replica_pools[replica_index];
}
}
return std::make_shared<ConnectionPool>(
1, host, port, default_database,
user, password, timeouts,
storage.getName() + '_' + user,
Protocol::Compression::Enable,
secure);
1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts,
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
};
auto pools = createPoolsForAddresses(name, pool_factory);

View File

@ -494,7 +494,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local)
dir_names.push_back(address.toStringFull());
dir_names.push_back(address.toFullString());
if (!dir_names.empty())
writeToShard(block, dir_names);

View File

@ -33,7 +33,7 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
min_compress_block_size(min_compress_block_size_),
max_compress_block_size(max_compress_block_size_),
aio_threshold(aio_threshold_),
codec(codec_)
codec(std::move(codec_))
{
}

View File

@ -144,9 +144,9 @@ private:
struct Stream
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size)
compressed(plain, std::move(codec), max_compress_block_size)
{
plain_offset = Poco::File(data_path).getSize();
}
@ -355,7 +355,12 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
if (written_streams.count(stream_name))
return;
streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
const auto & columns = storage.getColumns();
streams.try_emplace(
stream_name,
storage.files[stream_name].data_file.path(),
columns.getCodecOrDefault(name),
storage.max_compress_block_size);
}, settings.path);
settings.getter = createStreamGetter(name, written_streams);

View File

@ -135,9 +135,9 @@ private:
struct Stream
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size)
compressed(plain, std::move(codec), max_compress_block_size)
{
}
@ -244,8 +244,10 @@ IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const
if (!written_streams.insert(stream_name).second)
return nullptr;
const auto & columns = storage.getColumns();
if (!streams.count(stream_name))
streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(),
columns.getCodecOrDefault(name),
storage.max_compress_block_size);
return &streams[stream_name]->compressed;

View File

@ -26,44 +26,33 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
void StorageSystemClusters::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info, const Cluster::Address & address)
for (const auto & name_and_cluster : context.getClusters().getContainer())
{
size_t i = 0;
res_columns[i++]->insert(cluster_name);
res_columns[i++]->insert(shard_info.shard_num);
res_columns[i++]->insert(shard_info.weight);
res_columns[i++]->insert(address.replica_num);
res_columns[i++]->insert(address.host_name);
res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString());
res_columns[i++]->insert(address.port);
res_columns[i++]->insert(shard_info.isLocal());
res_columns[i++]->insert(address.user);
res_columns[i++]->insert(address.default_database);
};
auto clusters = context.getClusters().getContainer();
for (const auto & entry : clusters)
{
const std::string cluster_name = entry.first;
const ClusterPtr cluster = entry.second;
const auto & addresses_with_failover = cluster->getShardsAddresses();
const String & cluster_name = name_and_cluster.first;
const ClusterPtr & cluster = name_and_cluster.second;
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
if (!addresses_with_failover.empty())
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
auto it1 = addresses_with_failover.cbegin();
auto it2 = shards_info.cbegin();
const auto & shard_info = shards_info[shard_index];
const auto & shard_addresses = addresses_with_failover[shard_index];
while (it1 != addresses_with_failover.cend())
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
{
const auto & addresses = *it1;
const auto & shard_info = *it2;
size_t i = 0;
const auto & address = shard_addresses[replica_index];
for (const auto & address : addresses)
updateColumns(cluster_name, shard_info, address);
++it1;
++it2;
res_columns[i++]->insert(cluster_name);
res_columns[i++]->insert(shard_info.shard_num);
res_columns[i++]->insert(shard_info.weight);
res_columns[i++]->insert(replica_index + 1);
res_columns[i++]->insert(address.host_name);
res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString());
res_columns[i++]->insert(address.port);
res_columns[i++]->insert(shard_info.isLocal());
res_columns[i++]->insert(address.user);
res_columns[i++]->insert(address.default_database);
}
}
}

View File

@ -0,0 +1,26 @@
CREATE TABLE test.compression_codec_log ( id UInt64 CODEC(LZ4), data String CODEC(ZSTD(1)), ddd Date CODEC(NONE), somenum Float64 CODEC(ZSTD(2)), somestr FixedString(3) CODEC(LZ4HC(7)), othernum Int64 CODEC(Delta(8))) ENGINE = Log()
1 hello 2018-12-14 1.1 aaa 5
2 world 2018-12-15 2.2 bbb 6
3 ! 2018-12-16 3.3 ccc 7
2
CREATE TABLE test.compression_codec_multiple_log ( id UInt64 CODEC(LZ4, ZSTD(1), NONE, LZ4HC(0), Delta(4)), data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC(0), LZ4, LZ4, Delta(8)), ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD(1), LZ4HC(0), LZ4HC(0)), somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD(1))) ENGINE = Log()
1 world 2018-10-05 1.1
2 hello 2018-10-01 2.2
3 buy 2018-10-11 3.3
10003
10003
274972506.6
9175437371954010821
CREATE TABLE test.compression_codec_tiny_log ( id UInt64 CODEC(LZ4), data String CODEC(ZSTD(1)), ddd Date CODEC(NONE), somenum Float64 CODEC(ZSTD(2)), somestr FixedString(3) CODEC(LZ4HC(7)), othernum Int64 CODEC(Delta(8))) ENGINE = TinyLog()
1 hello 2018-12-14 1.1 aaa 5
2 world 2018-12-15 2.2 bbb 6
3 ! 2018-12-16 3.3 ccc 7
2
CREATE TABLE test.compression_codec_multiple_tiny_log ( id UInt64 CODEC(LZ4, ZSTD(1), NONE, LZ4HC(0), Delta(4)), data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC(0), LZ4, LZ4, Delta(8)), ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD(1), LZ4HC(0), LZ4HC(0)), somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD(1))) ENGINE = TinyLog()
1 world 2018-10-05 1.1
2 hello 2018-10-01 2.2
3 buy 2018-10-11 3.3
10003
10003
274972506.6
9175437371954010821

View File

@ -0,0 +1,118 @@
SET send_logs_level = 'none';
-- copy-paste for storage log
DROP TABLE IF EXISTS test.compression_codec_log;
CREATE TABLE test.compression_codec_log(
id UInt64 CODEC(LZ4),
data String CODEC(ZSTD),
ddd Date CODEC(NONE),
somenum Float64 CODEC(ZSTD(2)),
somestr FixedString(3) CODEC(LZ4HC(7)),
othernum Int64 CODEC(Delta)
) ENGINE = Log();
SHOW CREATE TABLE test.compression_codec_log;
INSERT INTO test.compression_codec_log VALUES(1, 'hello', toDate('2018-12-14'), 1.1, 'aaa', 5);
INSERT INTO test.compression_codec_log VALUES(2, 'world', toDate('2018-12-15'), 2.2, 'bbb', 6);
INSERT INTO test.compression_codec_log VALUES(3, '!', toDate('2018-12-16'), 3.3, 'ccc', 7);
SELECT * FROM test.compression_codec_log ORDER BY id;
INSERT INTO test.compression_codec_log VALUES(2, '', toDate('2018-12-13'), 4.4, 'ddd', 8);
DETACH TABLE test.compression_codec_log;
ATTACH TABLE test.compression_codec_log;
SELECT count(*) FROM test.compression_codec_log WHERE id = 2 GROUP BY id;
DROP TABLE IF EXISTS test.compression_codec_log;
DROP TABLE IF EXISTS test.compression_codec_multiple_log;
CREATE TABLE test.compression_codec_multiple_log (
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, Delta(4)),
data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC, LZ4, LZ4, Delta(8)),
ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD, LZ4HC, LZ4HC),
somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD)
) ENGINE = Log();
SHOW CREATE TABLE test.compression_codec_multiple_log;
INSERT INTO test.compression_codec_multiple_log VALUES (1, 'world', toDate('2018-10-05'), 1.1), (2, 'hello', toDate('2018-10-01'), 2.2), (3, 'buy', toDate('2018-10-11'), 3.3);
SELECT * FROM test.compression_codec_multiple_log ORDER BY id;
INSERT INTO test.compression_codec_multiple_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT count(*) FROM test.compression_codec_multiple_log;
SELECT count(distinct data) FROM test.compression_codec_multiple_log;
SELECT floor(sum(somenum), 1) FROM test.compression_codec_multiple_log;
TRUNCATE TABLE test.compression_codec_multiple_log;
INSERT INTO test.compression_codec_multiple_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT sum(cityHash64(*)) FROM test.compression_codec_multiple_log;
-- copy-paste for storage tiny log
DROP TABLE IF EXISTS test.compression_codec_tiny_log;
CREATE TABLE test.compression_codec_tiny_log(
id UInt64 CODEC(LZ4),
data String CODEC(ZSTD),
ddd Date CODEC(NONE),
somenum Float64 CODEC(ZSTD(2)),
somestr FixedString(3) CODEC(LZ4HC(7)),
othernum Int64 CODEC(Delta)
) ENGINE = TinyLog();
SHOW CREATE TABLE test.compression_codec_tiny_log;
INSERT INTO test.compression_codec_tiny_log VALUES(1, 'hello', toDate('2018-12-14'), 1.1, 'aaa', 5);
INSERT INTO test.compression_codec_tiny_log VALUES(2, 'world', toDate('2018-12-15'), 2.2, 'bbb', 6);
INSERT INTO test.compression_codec_tiny_log VALUES(3, '!', toDate('2018-12-16'), 3.3, 'ccc', 7);
SELECT * FROM test.compression_codec_tiny_log ORDER BY id;
INSERT INTO test.compression_codec_tiny_log VALUES(2, '', toDate('2018-12-13'), 4.4, 'ddd', 8);
DETACH TABLE test.compression_codec_tiny_log;
ATTACH TABLE test.compression_codec_tiny_log;
SELECT count(*) FROM test.compression_codec_tiny_log WHERE id = 2 GROUP BY id;
DROP TABLE IF EXISTS test.compression_codec_tiny_log;
DROP TABLE IF EXISTS test.compression_codec_multiple_tiny_log;
CREATE TABLE test.compression_codec_multiple_tiny_log (
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, Delta(4)),
data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC, LZ4, LZ4, Delta(8)),
ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD, LZ4HC, LZ4HC),
somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD)
) ENGINE = TinyLog();
SHOW CREATE TABLE test.compression_codec_multiple_tiny_log;
INSERT INTO test.compression_codec_multiple_tiny_log VALUES (1, 'world', toDate('2018-10-05'), 1.1), (2, 'hello', toDate('2018-10-01'), 2.2), (3, 'buy', toDate('2018-10-11'), 3.3);
SELECT * FROM test.compression_codec_multiple_tiny_log ORDER BY id;
INSERT INTO test.compression_codec_multiple_tiny_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT count(*) FROM test.compression_codec_multiple_tiny_log;
SELECT count(distinct data) FROM test.compression_codec_multiple_tiny_log;
SELECT floor(sum(somenum), 1) FROM test.compression_codec_multiple_tiny_log;
TRUNCATE TABLE test.compression_codec_multiple_tiny_log;
INSERT INTO test.compression_codec_multiple_tiny_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT sum(cityHash64(*)) FROM test.compression_codec_multiple_tiny_log;

View File

@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS test.sign (Sign Int8, Arr Array(Int8)) ENGINE = Memory;
SELECT arrayMap(x -> x * Sign, Arr) FROM test.sign;
DROP TABLE test.sign;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (19.1.3) unstable; urgency=low
clickhouse (19.1.4) unstable; urgency=low
* Modified source code
-- <root@yandex-team.ru> Mon, 21 Jan 2019 16:26:13 +0300
-- <root@yandex-team.ru> Tue, 22 Jan 2019 02:29:09 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.1.3
ARG version=19.1.4
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.1.3
ARG version=19.1.4
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.1.3
ARG version=19.1.4
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -14,7 +14,6 @@ The table below lists supported formats and how they can be used in `INSERT` and
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [Values](#values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
@ -345,6 +344,8 @@ Each result block is output as a separate table. This is necessary so that block
[NULL](../query_language/syntax.md) is output as `ᴺᵁᴸᴸ`.
Example (shown for the [PrettyCompact](#prettycompact) format):
``` sql
SELECT * FROM t_null
```
@ -355,10 +356,22 @@ SELECT * FROM t_null
└───┴──────┘
```
Rows are not escaped in Pretty* formats. Example is shown for the [PrettyCompact](#prettycompact) format:
``` sql
SELECT 'String with \'quotes\' and \t character' AS Escaping_test
```
```
┌─Escaping_test────────────────────────┐
│ String with 'quotes' and character │
└──────────────────────────────────────┘
```
To avoid dumping too much data to the terminal, only the first 10,000 rows are printed. If the number of rows is greater than or equal to 10,000, the message "Showed first 10 000" is printed.
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table).
The Pretty format supports outputting total values (when using WITH TOTALS) and extremes (when 'extremes' is set to 1). In these cases, total values and extreme values are output after the main data, in separate tables. Example (shown for the PrettyCompact format):
The Pretty format supports outputting total values (when using WITH TOTALS) and extremes (when 'extremes' is set to 1). In these cases, total values and extreme values are output after the main data, in separate tables. Example (shown for the [PrettyCompact](#prettycompact) format):
``` sql
SELECT EventDate, count() AS c FROM test.hits GROUP BY EventDate WITH TOTALS ORDER BY EventDate FORMAT PrettyCompact
@ -389,7 +402,7 @@ Extremes:
## PrettyCompact {#prettycompact}
Differs from `Pretty` in that the grid is drawn between rows and the result is more compact.
Differs from [Pretty](#pretty) in that the grid is drawn between rows and the result is more compact.
This format is used by default in the command-line client in interactive mode.
## PrettyCompactMonoBlock {#prettycompactmonoblock}
@ -461,37 +474,20 @@ Row 1:
x: 1
y: ᴺᵁᴸᴸ
```
Rows are not escaped in Vertical format:
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table).
## VerticalRaw {#verticalraw}
Differs from `Vertical` format in that the rows are not escaped.
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table).
Examples:
``` sql
SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT Vertical
```
```
:) SHOW CREATE TABLE geonames FORMAT VerticalRaw;
Row 1:
──────
statement: CREATE TABLE default.geonames ( geonameid UInt32, date Date DEFAULT CAST('2017-12-08' AS Date)) ENGINE = MergeTree(date, geonameid, 8192)
:) SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT VerticalRaw;
Row 1:
──────
test: string with 'quotes' and with some special
test: string with 'quotes' and with some special
characters
```
Compare with the Vertical format:
```
:) SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT Vertical;
Row 1:
──────
test: string with \'quotes\' and \t with some special \n characters
```
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table).
## XML {#xml}