Python에서 Cassandra db를 사용하기 위한 도구로
Pycassa (https://github.com/pycassa/pycassa) 와
cassandra-dbapi2(https://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/) 가 대표적이다.
Pycassa 는 커넥션 풀기능이 좋으나 cql3를 사용하기가 번거롭고 cassandra-dbapi2는 cql3를 사용하기 쉬우나 커넥션 풀 기능이 약하다.
java로 프로그래밍한다면 jdbc가 있으니까 다운받아서 사용하면 좋지만 python을 포기하기 싫어서 이래저래 알아보던중, cassandra-dbapi2 코드 중 일부를 가져와 Pycassa에서 쉽게 cql3를 사용 할 수 있도록 플러그인(?) 같은걸 만들었다.
(만들었다기 보다는... 짜집기 했다? 적당히 긁어왔다? 정도가 적당한거 같다.)
(2013.06.18 update - collection (set, list, map) 타입도 조회 잘 되게 수정)
사용하기 위해서는 우선 Pycassa 1.9.0 이상 버전이 설치 되어 있어야 한다. (상단의 링크를 타고들어가서 설치한다.)
사용하는 방법은
from pycassa import ConnectionPool
from pycassa_plugin import PycassaTable, PycassaTableResultType
pool = ConnectionPool('demodb', ['localhost:9160'], {'username':'아이디가 있다면', 'password':'비밀번호'})
new_table = PycassaTable(pool)
#입력, 수정, 삭제
new_table.execute_cql("INSERT INTO EMP(EMPID, DEPTID, FIRST_NAME, LAST_NAME) VALUES(48, 630, 'EY', 'Shin')")
#조회 (return_type 종류(PycassaTableResultType.LIST, PycassaTableResultType.DICT) 에 따라 list, dict 타입으로 결과가 나온다.)
resultSet = new_table.get_result("SELECT * FROM EMP WHERE EMPID = :empid", {"empid" : 48}, return_type = PycassaTableResultType.LIST)
print resultSet
소스는 아래와 같다.
#-*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Created on 2013. 6. 12.
@author: purepleya
almost code below is from cql1.4.0
https://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/
https://pypi.python.org/pypi/cql/1.4.0
'''
import re
import struct
from pycassa.cassandra.ttypes import Compression, ConsistencyLevel, CqlResultType
def get_collection(adapter, subtype, value):
numelements = uint16_unpack(value[:2])
p = 2
result = []
for n in xrange(numelements):
itemlen = uint16_unpack(value[p:p+2])
p += 2
item = value[p:p+itemlen]
p += itemlen
result.append(decode_value([subtype], item))
return adapter(result)
def get_collection_map(subkeytype, subvaltype, value):
numelements = uint16_unpack(value[:2])
p = 2
themap = {}
for n in xrange(numelements):
key_len = uint16_unpack(value[p:p+2])
p += 2
keybytes = value[p:p+key_len]
p += key_len
val_len = uint16_unpack(value[p:p+2])
p += 2
valbytes = value[p:p+val_len]
p += val_len
key = decode_value([subkeytype], keybytes)
val = decode_value([subvaltype], valbytes)
themap[key] = val
return themap
def _make_packer(format_string):
try:
packer = struct.Struct(format_string) # new in Python 2.5
except AttributeError:
pack = lambda x: struct.pack(format_string, x)
unpack = lambda s: struct.unpack(format_string, s)
else:
pack = packer.pack
unpack = lambda s: packer.unpack(s)[0]
return pack, unpack
int64_pack, int64_unpack = _make_packer('>q')
int32_pack, int32_unpack = _make_packer('>i')
int16_pack, int16_unpack = _make_packer('>h')
int8_pack, int8_unpack = _make_packer('>b')
uint64_pack, uint64_unpack = _make_packer('>Q')
uint32_pack, uint32_unpack = _make_packer('>I')
uint16_pack, uint16_unpack = _make_packer('>H')
uint8_pack, uint8_unpack = _make_packer('>B')
float_pack, float_unpack = _make_packer('>f')
double_pack, double_unpack = _make_packer('>d')
def decode_value(value_type, value):
if value == None : return None
if value_type[0] == "Int32Type":
return int32_unpack(value)
elif value_type[0] == "UTF8Type":
return value.decode("utf8")
elif value_type[0] == "BooleanType":
return bool(int8_unpack(value))
elif value_type[0] == "FloatType":
return float_unpack(value)
elif value_type[0] == "DoubleType":
return double_unpack(value)
elif value_type[0] == "LongType":
return int64_unpack(value)
# elif value_type == "IntegerType":
# return varint_unpack(value)
elif value_type[0] == "DateType":
return int64_unpack(value) / 1000.0
elif value_type[0] == "SetType":
return get_collection(list, value_type[1], value)
elif value_type[0] == "ListType":
# return get_collection(list, value_type[1], value)
return get_collection(tuple, value_type[1], value)
elif value_type[0] == "MapType":
return get_collection_map(value_type[1], value_type[2], value)
return value
stringlit_re = re.compile(r"""('[^']*'|"[^"]*")""")
comments_re = re.compile(r'(/\*(?:[^*]|\*[^/])*\*/|//.*$|--.*$)', re.MULTILINE)
param_re = re.compile(r'''
( \W ) # don't match : at the beginning of the text (meaning it
# immediately follows a comment or string literal) or
# right after an identifier character
: ( \w+ )
(?= \W ) # and don't match a param that is immediately followed by
# a comment or string literal either
''', re.IGNORECASE | re.VERBOSE)
def cql_quote(term):
def __escape_quotes(term):
assert isinstance(term, basestring)
return term.replace("'", "''")
if isinstance(term, unicode):
return "'%s'" % __escape_quotes(term.encode('utf8'))
elif isinstance(term, (str, bool)):
return "'%s'" % __escape_quotes(str(term))
else:
return str(term)
def replace_param_substitutions(query, replacer):
split_strings = stringlit_re.split(' ' + query + ' ')
split_str_and_cmt = []
for p in split_strings:
if p[:1] in '\'"':
split_str_and_cmt.append(p)
else:
split_str_and_cmt.extend(comments_re.split(p))
output = []
for p in split_str_and_cmt:
if p[:1] in '\'"' or p[:2] in ('--', '//', '/*'):
output.append(p)
else:
output.append(param_re.sub(replacer, p))
assert output[0][0] == ' ' and output[-1][-1] == ' '
return ''.join(output)[1:-1]
def prepare_inline(query, params):
"""
For every match of the form ":param_name", call cql_quote
on kwargs['param_name'] and replace that section of the query
with the result
"""
def param_replacer(match):
return match.group(1) + cql_quote(params[match.group(2)])
return replace_param_substitutions(query, param_replacer)
class PycassaTableResultType():
LIST = 1
DICT = 2
_VALUES_TO_NAMES = {
1: "LIST",
2: "DICT",
}
_NAMES_TO_VALUES = {
"LIST": 1,
"DICT": 2,
}
class PycassaTable(object):
def __init__(self, conn_pool, consistencyLv = ConsistencyLevel.QUORUM):
self.conn_pool = conn_pool
self.consistencyLv = consistencyLv
def execute_cql(self, query, params = {}):
self.schema = {}
conn = self.conn_pool.get()
self.query = prepare_inline(query, params)
cql_result = conn.execute_cql3_query(self.query, Compression.NONE, self.consistencyLv)
# self.schema 구조
# 일반 타입 - ["타입명"]
# set, list - ["타입명", "서브타입명"]
# map - ["타입명", "키 타입", "값 타입"]
for col_name in cql_result.schema.value_types.keys():
self.schema[col_name] = []
temp_str = cql_result.schema.value_types[col_name].split("(")
col_type = temp_str[0].split(".")
self.schema[col_name].append(col_type[len(col_type) - 1])
if self.schema[col_name][0] == "SetType" or self.schema[col_name][0] == "ListType":
sub_type_value = temp_str[1].replace(")", "").split(".")
sub_type_value = sub_type_value[len(sub_type_value) - 1]
self.schema[col_name].append(sub_type_value)
elif self.schema[col_name][0] == "MapType":
sub_type_key = temp_str[1].replace(")", "").split(",")[0].split(".")
sub_type_key = sub_type_key[len(sub_type_key) - 1]
sub_type_value = temp_str[1].replace(")", "").split(",")[1].split(".")
sub_type_value = sub_type_value[len(sub_type_value) - 1]
self.schema[col_name].append(sub_type_key)
self.schema[col_name].append(sub_type_value)
self.conn_pool.put(conn) #mocheogil 님이 수정해주셨습니다.
return cql_result
def get_result(self, query, params = {}, return_type = PycassaTableResultType.LIST):
result = []
cql_result = self.execute_cql(query, params)
if cql_result.type == CqlResultType.VOID:
return None
elif cql_result.type == CqlResultType.INT:
return result.append(cql_result.num)
for raw_row in cql_result.rows:
if return_type == PycassaTableResultType.LIST:
row = []
elif return_type == PycassaTableResultType.DICT:
row = {}
for raw_col in raw_row.columns:
if isinstance(row, list):
row.append(decode_value(self.schema[raw_col.name], raw_col.value))
elif isinstance(row, dict):
row[raw_col.name] = (decode_value(self.schema[raw_col.name], raw_col.value))
result.append(row)
return result
혹시 누군가 더 좋은 코드로 발전 시키거나 더 좋은 코드를 가지고 계시다면 연락좀 부탁드립니다.
반응형
'Cassandra' 카테고리의 다른 글
| Cassandra - Index 설정하기 (0) | 2013.06.20 |
|---|---|
| Cassandra - Set, List, Map 사용하기 (0) | 2013.06.19 |
| Cassandra - Table(Column Family) 생성, 수정 (CQL) (0) | 2013.05.24 |
| Cassandra - Keyspace 에 생성된 column family(table) 조회 (0) | 2013.05.24 |
| Cassandra - CQL 데이터 타입 (0) | 2013.05.24 |
pycassa_plugin.py