mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fix compilation errors
This commit is contained in:
parent
cec08ed148
commit
3175caa1c0
@ -1,11 +1,14 @@
|
||||
#include <Common/config.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
# include <Common/config.h>
|
||||
# include <Columns/ColumnNullable.h>
|
||||
# include <Columns/ColumnString.h>
|
||||
# include <Columns/ColumnsNumber.h>
|
||||
# include <Core/ExternalResultDescription.h>
|
||||
# include <Columns/ColumnString.h>
|
||||
# include <IO/ReadHelpers.h>
|
||||
|
||||
#if USE_CASSANDRA
|
||||
|
||||
# include <utility>
|
||||
# include "CassandraBlockInputStream.h"
|
||||
# include "CassandraBlockInputStream.h"
|
||||
|
||||
@ -24,11 +27,11 @@ CassandraBlockInputStream::CassandraBlockInputStream(
|
||||
const DB::Block &sample_block,
|
||||
const size_t max_block_size)
|
||||
: session{session}
|
||||
, statement{cass_statement_new(query_str.c_str(), 0)}
|
||||
, query_str{query_str}
|
||||
, max_block_size{max_block_size}
|
||||
{
|
||||
CassStatement * statement = cass_statement_new(query_str.c_str(), 0);
|
||||
cass_statement_set_paging_size(statement, max_block_size)
|
||||
cass_statement_set_paging_size(statement, max_block_size);
|
||||
this->has_more_pages = cass_true;
|
||||
|
||||
description.init(sample_block);
|
||||
@ -130,33 +133,37 @@ namespace
|
||||
{
|
||||
cass_int64_t _value;
|
||||
cass_value_get_int64(value, &_value);
|
||||
static_cast<ColumnUInt32 &>(column).insertValue(UInt32{cass_date_from_epoch(_value)});
|
||||
static_cast<ColumnUInt16 &>(column).insertValue(UInt32{cass_date_from_epoch(_value)}); // FIXME
|
||||
break;
|
||||
}
|
||||
case ValueType::DateTime:
|
||||
{
|
||||
cass_int64_t _value;
|
||||
cass_value_get_int64(value, &_value);
|
||||
static_cast<ColumnUInt64 &>(column).insertValue(_value);
|
||||
static_cast<ColumnUInt32 &>(column).insertValue(_value);
|
||||
break;
|
||||
}
|
||||
case ValueType::UUID:
|
||||
{
|
||||
CassUuid _value;
|
||||
cass_value_get_uuid(value, &_value);
|
||||
static_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
|
||||
std::array<char, CASS_UUID_STRING_LENGTH> uuid_str;
|
||||
cass_uuid_string(_value, uuid_str.data());
|
||||
static_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
|
||||
// void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
|
||||
|
||||
Block CassandraBlockInputStream::readImpl()
|
||||
{
|
||||
if (has_more_pages)
|
||||
return {};
|
||||
|
||||
MutableColumns columns(description.sample_block.columns());
|
||||
CassFuture* query_future = cass_session_execute(session, statement);
|
||||
|
||||
const CassResult* result = cass_future_get_result(query_future);
|
||||
@ -164,7 +171,7 @@ namespace
|
||||
if (result == nullptr) {
|
||||
const char* error_message;
|
||||
size_t error_message_length;
|
||||
cass_future_error_message(future, &error_message, &error_message_length);
|
||||
cass_future_error_message(query_future, &error_message, &error_message_length);
|
||||
|
||||
throw Exception{error_message, ErrorCodes::CASSANDRA_INTERNAL_ERROR};
|
||||
}
|
||||
@ -175,7 +182,8 @@ namespace
|
||||
while (cass_iterator_next(iterator)) {
|
||||
const CassValue* _key = cass_iterator_get_map_key(iterator);
|
||||
const CassValue* _value = cass_iterator_get_map_value(iterator);
|
||||
for (const auto &[value, idx]: {{_key, 0}, {_value, 1}}) {
|
||||
auto pair_values = {std::make_pair(_key, 0ul), std::make_pair(_value, 1ul)};
|
||||
for (const auto &[value, idx]: pair_values) {
|
||||
if (description.types[idx].second) {
|
||||
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
|
||||
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
|
||||
@ -193,8 +201,9 @@ namespace
|
||||
}
|
||||
|
||||
cass_result_free(result);
|
||||
|
||||
return description.sample_block.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,10 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <cassandra.h>
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include "ExternalResultDescription.h"
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -27,8 +26,9 @@ namespace DB
|
||||
private:
|
||||
Block readImpl() override;
|
||||
|
||||
CassSession * session,
|
||||
const std::string & query_str;
|
||||
CassSession * session;
|
||||
CassStatement * statement;
|
||||
String query_str;
|
||||
const size_t max_block_size;
|
||||
ExternalResultDescription description;
|
||||
const CassResult * result;
|
||||
|
@ -35,6 +35,8 @@ namespace DB
|
||||
#if USE_CASSANDRA
|
||||
|
||||
# include <cassandra.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include "CassandraBlockInputStream.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -106,7 +108,13 @@ std::string CassandraDictionarySource::toConnectionString(const std::string &hos
|
||||
return host + (port != 0 ? ":" + std::to_string(port) : "");
|
||||
}
|
||||
|
||||
BlockInputStreamPtr CassandraDict
|
||||
BlockInputStreamPtr CassandraDictionarySource::loadAll() {
|
||||
return std::make_shared<CassandraBlockInputStream>(nullptr, "", sample_block, max_block_size);
|
||||
}
|
||||
|
||||
std::string CassandraDictionarySource::toString() const {
|
||||
return "Cassandra: " + /*db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + */ host + ':' + DB::toString(port);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/config.h>
|
||||
#include <Core/Block.h>
|
||||
|
||||
#if USE_CASSANDRA
|
||||
|
||||
@ -34,9 +35,25 @@ public:
|
||||
|
||||
BlockInputStreamPtr loadAll() override;
|
||||
|
||||
bool supportsSelectiveLoad() const override { return true; }
|
||||
|
||||
bool isModified() const override { return true; }
|
||||
|
||||
///Not yet supported
|
||||
bool hasUpdateField() const override { return false; }
|
||||
|
||||
DictionarySourcePtr clone() const override { return std::make_unique<CassandraDictionarySource>(*this); }
|
||||
|
||||
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
|
||||
|
||||
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
|
||||
|
||||
BlockInputStreamPtr loadUpdatedAll() override
|
||||
{
|
||||
throw Exception{"Method loadUpdatedAll is unsupported for CassandraDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
|
||||
}
|
||||
|
||||
std::string toString() const override;
|
||||
|
||||
private:
|
||||
static std::string toConnectionString(const std::string& host, const UInt16 port);
|
||||
|
Loading…
Reference in New Issue
Block a user