mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #59969 from ClickHouse/no-aerospike
We don't have external dictionaries from Aerospike
This commit is contained in:
commit
4fb76a6e25
@ -62,7 +62,6 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - \
|
||||
# kazoo 2.10.0 is broken
|
||||
# https://s3.amazonaws.com/clickhouse-test-reports/59337/524625a1d2f4cc608a3f1059e3df2c30f353a649/integration_tests__asan__analyzer__[5_6].html
|
||||
RUN python3 -m pip install --no-cache-dir \
|
||||
aerospike==11.1.0 \
|
||||
PyMySQL==1.1.0 \
|
||||
asyncio==3.4.3 \
|
||||
avro==1.10.2 \
|
||||
|
@ -19,7 +19,6 @@ Don't use Docker from your system repository.
|
||||
```
|
||||
sudo -H pip install \
|
||||
PyMySQL \
|
||||
aerospike \
|
||||
avro \
|
||||
cassandra-driver \
|
||||
confluent-kafka \
|
||||
|
@ -4,7 +4,6 @@ import os
|
||||
import uuid
|
||||
import warnings
|
||||
|
||||
import aerospike
|
||||
import cassandra.cluster
|
||||
import pymongo
|
||||
import pymysql.cursors
|
||||
@ -696,91 +695,3 @@ class SourceRedis(ExternalSource):
|
||||
or layout.is_complex
|
||||
and self.storage_type == "hash_map"
|
||||
)
|
||||
|
||||
|
||||
class SourceAerospike(ExternalSource):
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
internal_hostname,
|
||||
internal_port,
|
||||
docker_hostname,
|
||||
docker_port,
|
||||
user,
|
||||
password,
|
||||
):
|
||||
ExternalSource.__init__(
|
||||
self,
|
||||
name,
|
||||
internal_hostname,
|
||||
internal_port,
|
||||
docker_hostname,
|
||||
docker_port,
|
||||
user,
|
||||
password,
|
||||
)
|
||||
self.namespace = "test"
|
||||
self.set = "test_set"
|
||||
|
||||
def get_source_str(self, table_name):
|
||||
print("AEROSPIKE get source str")
|
||||
return """
|
||||
<aerospike>
|
||||
<host>{host}</host>
|
||||
<port>{port}</port>
|
||||
</aerospike>
|
||||
""".format(
|
||||
host=self.docker_hostname,
|
||||
port=self.docker_port,
|
||||
)
|
||||
|
||||
def prepare(self, structure, table_name, cluster):
|
||||
config = {"hosts": [(self.internal_hostname, self.internal_port)]}
|
||||
self.client = aerospike.client(config).connect()
|
||||
self.prepared = True
|
||||
print("PREPARED AEROSPIKE")
|
||||
print(config)
|
||||
|
||||
def compatible_with_layout(self, layout):
|
||||
print("compatible AEROSPIKE")
|
||||
return layout.is_simple
|
||||
|
||||
def _flush_aerospike_db(self):
|
||||
keys = []
|
||||
|
||||
def handle_record(xxx_todo_changeme):
|
||||
(key, metadata, record) = xxx_todo_changeme
|
||||
print(("Handle record {} {}".format(key, record)))
|
||||
keys.append(key)
|
||||
|
||||
def print_record(xxx_todo_changeme1):
|
||||
(key, metadata, record) = xxx_todo_changeme1
|
||||
print(("Print record {} {}".format(key, record)))
|
||||
|
||||
scan = self.client.scan(self.namespace, self.set)
|
||||
scan.foreach(handle_record)
|
||||
|
||||
[self.client.remove(key) for key in keys]
|
||||
|
||||
def load_kv_data(self, values):
|
||||
self._flush_aerospike_db()
|
||||
|
||||
print("Load KV Data Aerospike")
|
||||
if len(values[0]) == 2:
|
||||
for value in values:
|
||||
key = (self.namespace, self.set, value[0])
|
||||
print(key)
|
||||
self.client.put(
|
||||
key,
|
||||
{"bin_value": value[1]},
|
||||
policy={"key": aerospike.POLICY_KEY_SEND},
|
||||
)
|
||||
assert self.client.exists(key)
|
||||
else:
|
||||
assert "VALUES SIZE != 2"
|
||||
|
||||
# print(values)
|
||||
|
||||
def load_data(self, data, table_name):
|
||||
print("Load Data Aerospike")
|
||||
# print(data)
|
||||
|
Loading…
Reference in New Issue
Block a user