Python에서 Cassandra db를 사용하기 위한 도구로
Pycassa (https://github.com/pycassa/pycassa) 와
cassandra-dbapi2(https://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/) 가 대표적이다.
Pycassa 는 커넥션 풀기능이 좋으나 cql3를 사용하기가 번거롭고 cassandra-dbapi2는 cql3를 사용하기 쉬우나 커넥션 풀 기능이 약하다.
java로 프로그래밍한다면 jdbc가 있으니까 다운받아서 사용하면 좋지만 python을 포기하기 싫어서 이래저래 알아보던중, cassandra-dbapi2 코드 중 일부를 가져와 Pycassa에서 쉽게 cql3를 사용 할 수 있도록 플러그인(?) 같은걸 만들었다.
(만들었다기 보다는... 짜집기 했다? 적당히 긁어왔다? 정도가 적당한거 같다.)
(2013.06.18 update - collection (set, list, map) 타입도 조회 잘 되게 수정)
사용하기 위해서는 우선 Pycassa 1.9.0 이상 버전이 설치 되어 있어야 한다. (상단의 링크를 타고들어가서 설치한다.)
사용하는 방법은
from pycassa import ConnectionPool from pycassa_plugin import PycassaTable, PycassaTableResultType pool = ConnectionPool('demodb', ['localhost:9160'], {'username':'아이디가 있다면', 'password':'비밀번호'}) new_table = PycassaTable(pool) #입력, 수정, 삭제 new_table.execute_cql("INSERT INTO EMP(EMPID, DEPTID, FIRST_NAME, LAST_NAME) VALUES(48, 630, 'EY', 'Shin')") #조회 (return_type 종류(PycassaTableResultType.LIST, PycassaTableResultType.DICT) 에 따라 list, dict 타입으로 결과가 나온다.) resultSet = new_table.get_result("SELECT * FROM EMP WHERE EMPID = :empid", {"empid" : 48}, return_type = PycassaTableResultType.LIST) print resultSet
소스는 아래와 같다.
#-*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ''' Created on 2013. 6. 12. @author: purepleya almost code below is from cql1.4.0 https://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/ https://pypi.python.org/pypi/cql/1.4.0 ''' import re import struct from pycassa.cassandra.ttypes import Compression, ConsistencyLevel, CqlResultType def get_collection(adapter, subtype, value): numelements = uint16_unpack(value[:2]) p = 2 result = [] for n in xrange(numelements): itemlen = uint16_unpack(value[p:p+2]) p += 2 item = value[p:p+itemlen] p += itemlen result.append(decode_value([subtype], item)) return adapter(result) def get_collection_map(subkeytype, subvaltype, value): numelements = uint16_unpack(value[:2]) p = 2 themap = {} for n in xrange(numelements): key_len = uint16_unpack(value[p:p+2]) p += 2 keybytes = value[p:p+key_len] p += key_len val_len = uint16_unpack(value[p:p+2]) p += 2 valbytes = value[p:p+val_len] p += val_len key = decode_value([subkeytype], keybytes) val = decode_value([subvaltype], valbytes) themap[key] = val return themap def _make_packer(format_string): try: packer = struct.Struct(format_string) # new in Python 2.5 except AttributeError: pack = lambda x: struct.pack(format_string, x) unpack = lambda s: struct.unpack(format_string, s) else: pack = packer.pack unpack = lambda s: packer.unpack(s)[0] return pack, unpack int64_pack, int64_unpack = _make_packer('>q') int32_pack, int32_unpack = _make_packer('>i') int16_pack, int16_unpack = _make_packer('>h') int8_pack, int8_unpack = _make_packer('>b') uint64_pack, uint64_unpack = _make_packer('>Q') uint32_pack, uint32_unpack = _make_packer('>I') uint16_pack, uint16_unpack = _make_packer('>H') uint8_pack, uint8_unpack = _make_packer('>B') float_pack, float_unpack = _make_packer('>f') double_pack, double_unpack = _make_packer('>d') def decode_value(value_type, value): if value == None : return None if value_type[0] == "Int32Type": return int32_unpack(value) elif value_type[0] == "UTF8Type": return value.decode("utf8") elif value_type[0] == "BooleanType": return bool(int8_unpack(value)) elif value_type[0] == "FloatType": return float_unpack(value) elif value_type[0] == "DoubleType": return double_unpack(value) elif value_type[0] == "LongType": return int64_unpack(value) # elif value_type == "IntegerType": # return varint_unpack(value) elif value_type[0] == "DateType": return int64_unpack(value) / 1000.0 elif value_type[0] == "SetType": return get_collection(list, value_type[1], value) elif value_type[0] == "ListType": # return get_collection(list, value_type[1], value) return get_collection(tuple, value_type[1], value) elif value_type[0] == "MapType": return get_collection_map(value_type[1], value_type[2], value) return value stringlit_re = re.compile(r"""('[^']*'|"[^"]*")""") comments_re = re.compile(r'(/\*(?:[^*]|\*[^/])*\*/|//.*$|--.*$)', re.MULTILINE) param_re = re.compile(r''' ( \W ) # don't match : at the beginning of the text (meaning it # immediately follows a comment or string literal) or # right after an identifier character : ( \w+ ) (?= \W ) # and don't match a param that is immediately followed by # a comment or string literal either ''', re.IGNORECASE | re.VERBOSE) def cql_quote(term): def __escape_quotes(term): assert isinstance(term, basestring) return term.replace("'", "''") if isinstance(term, unicode): return "'%s'" % __escape_quotes(term.encode('utf8')) elif isinstance(term, (str, bool)): return "'%s'" % __escape_quotes(str(term)) else: return str(term) def replace_param_substitutions(query, replacer): split_strings = stringlit_re.split(' ' + query + ' ') split_str_and_cmt = [] for p in split_strings: if p[:1] in '\'"': split_str_and_cmt.append(p) else: split_str_and_cmt.extend(comments_re.split(p)) output = [] for p in split_str_and_cmt: if p[:1] in '\'"' or p[:2] in ('--', '//', '/*'): output.append(p) else: output.append(param_re.sub(replacer, p)) assert output[0][0] == ' ' and output[-1][-1] == ' ' return ''.join(output)[1:-1] def prepare_inline(query, params): """ For every match of the form ":param_name", call cql_quote on kwargs['param_name'] and replace that section of the query with the result """ def param_replacer(match): return match.group(1) + cql_quote(params[match.group(2)]) return replace_param_substitutions(query, param_replacer) class PycassaTableResultType(): LIST = 1 DICT = 2 _VALUES_TO_NAMES = { 1: "LIST", 2: "DICT", } _NAMES_TO_VALUES = { "LIST": 1, "DICT": 2, } class PycassaTable(object): def __init__(self, conn_pool, consistencyLv = ConsistencyLevel.QUORUM): self.conn_pool = conn_pool self.consistencyLv = consistencyLv def execute_cql(self, query, params = {}): self.schema = {} conn = self.conn_pool.get() self.query = prepare_inline(query, params) cql_result = conn.execute_cql3_query(self.query, Compression.NONE, self.consistencyLv) # self.schema 구조 # 일반 타입 - ["타입명"] # set, list - ["타입명", "서브타입명"] # map - ["타입명", "키 타입", "값 타입"] for col_name in cql_result.schema.value_types.keys(): self.schema[col_name] = [] temp_str = cql_result.schema.value_types[col_name].split("(") col_type = temp_str[0].split(".") self.schema[col_name].append(col_type[len(col_type) - 1]) if self.schema[col_name][0] == "SetType" or self.schema[col_name][0] == "ListType": sub_type_value = temp_str[1].replace(")", "").split(".") sub_type_value = sub_type_value[len(sub_type_value) - 1] self.schema[col_name].append(sub_type_value) elif self.schema[col_name][0] == "MapType": sub_type_key = temp_str[1].replace(")", "").split(",")[0].split(".") sub_type_key = sub_type_key[len(sub_type_key) - 1] sub_type_value = temp_str[1].replace(")", "").split(",")[1].split(".") sub_type_value = sub_type_value[len(sub_type_value) - 1] self.schema[col_name].append(sub_type_key) self.schema[col_name].append(sub_type_value) self.conn_pool.put(conn) #mocheogil 님이 수정해주셨습니다. return cql_result def get_result(self, query, params = {}, return_type = PycassaTableResultType.LIST): result = [] cql_result = self.execute_cql(query, params) if cql_result.type == CqlResultType.VOID: return None elif cql_result.type == CqlResultType.INT: return result.append(cql_result.num) for raw_row in cql_result.rows: if return_type == PycassaTableResultType.LIST: row = [] elif return_type == PycassaTableResultType.DICT: row = {} for raw_col in raw_row.columns: if isinstance(row, list): row.append(decode_value(self.schema[raw_col.name], raw_col.value)) elif isinstance(row, dict): row[raw_col.name] = (decode_value(self.schema[raw_col.name], raw_col.value)) result.append(row) return result
혹시 누군가 더 좋은 코드로 발전 시키거나 더 좋은 코드를 가지고 계시다면 연락좀 부탁드립니다.
반응형
'Cassandra' 카테고리의 다른 글
Cassandra - Index 설정하기 (0) | 2013.06.20 |
---|---|
Cassandra - Set, List, Map 사용하기 (0) | 2013.06.19 |
Cassandra - Table(Column Family) 생성, 수정 (CQL) (0) | 2013.05.24 |
Cassandra - Keyspace 에 생성된 column family(table) 조회 (0) | 2013.05.24 |
Cassandra - CQL 데이터 타입 (0) | 2013.05.24 |