본문 바로가기

Cassandra

Cassandra - Pycassa에서 CQL3 사용하기!!!!

Python에서 Cassandra db를 사용하기 위한 도구로

Pycassa (https://github.com/pycassa/pycassa) 와 

cassandra-dbapi2(https://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/) 가 대표적이다.


Pycassa 는 커넥션 풀기능이 좋으나 cql3를 사용하기가 번거롭고 cassandra-dbapi2는 cql3를 사용하기 쉬우나 커넥션 풀 기능이 약하다.


java로 프로그래밍한다면 jdbc가 있으니까 다운받아서 사용하면 좋지만 python을 포기하기 싫어서 이래저래 알아보던중, cassandra-dbapi2 코드 중 일부를 가져와 Pycassa에서 쉽게 cql3를 사용 할 수 있도록 플러그인(?) 같은걸 만들었다.

(만들었다기 보다는... 짜집기 했다? 적당히 긁어왔다? 정도가 적당한거 같다.)


(2013.06.18 update - collection (set, list, map) 타입도 조회 잘 되게 수정)


pycassa_plugin.py



사용하기 위해서는 우선 Pycassa 1.9.0 이상 버전이 설치 되어 있어야 한다. (상단의 링크를 타고들어가서 설치한다.)


사용하는 방법은

from pycassa import ConnectionPool
from pycassa_plugin import PycassaTable, PycassaTableResultType

    pool = ConnectionPool('demodb', ['localhost:9160'], {'username':'아이디가 있다면', 'password':'비밀번호'})
    new_table = PycassaTable(pool)

    #입력, 수정, 삭제
    new_table.execute_cql("INSERT INTO EMP(EMPID, DEPTID, FIRST_NAME, LAST_NAME) VALUES(48, 630, 'EY', 'Shin')")

    #조회 (return_type 종류(PycassaTableResultType.LIST, PycassaTableResultType.DICT) 에 따라 list, dict 타입으로 결과가 나온다.)
    resultSet = new_table.get_result("SELECT * FROM EMP WHERE EMPID = :empid", {"empid" : 48}, return_type = PycassaTableResultType.LIST) 
    print resultSet





소스는 아래와 같다.


#-*- coding: utf-8 -*-

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
Created on 2013. 6. 12.

@author: purepleya

almost code below is from cql1.4.0
https://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/
https://pypi.python.org/pypi/cql/1.4.0
'''
import re
import struct
from pycassa.cassandra.ttypes import Compression, ConsistencyLevel, CqlResultType


def get_collection(adapter, subtype, value):
    numelements = uint16_unpack(value[:2])
    p = 2
    result = []
    for n in xrange(numelements):
        itemlen = uint16_unpack(value[p:p+2])
        p += 2
        item = value[p:p+itemlen]
        p += itemlen
        result.append(decode_value([subtype], item))
    return adapter(result)

def get_collection_map(subkeytype, subvaltype, value):
    numelements = uint16_unpack(value[:2])
    p = 2
    themap = {}
    for n in xrange(numelements):
        key_len = uint16_unpack(value[p:p+2])
        p += 2
        keybytes = value[p:p+key_len]
        p += key_len
        val_len = uint16_unpack(value[p:p+2])
        p += 2
        valbytes = value[p:p+val_len]
        p += val_len
        key = decode_value([subkeytype], keybytes)
        val = decode_value([subvaltype], valbytes)
        themap[key] = val
    return themap
    

def _make_packer(format_string):
    try:
        packer = struct.Struct(format_string) # new in Python 2.5
    except AttributeError:
        pack = lambda x: struct.pack(format_string, x)
        unpack = lambda s: struct.unpack(format_string, s)
    else:
        pack = packer.pack
        unpack = lambda s: packer.unpack(s)[0]
    return pack, unpack

int64_pack, int64_unpack = _make_packer('>q')
int32_pack, int32_unpack = _make_packer('>i')
int16_pack, int16_unpack = _make_packer('>h')
int8_pack, int8_unpack = _make_packer('>b')
uint64_pack, uint64_unpack = _make_packer('>Q')
uint32_pack, uint32_unpack = _make_packer('>I')
uint16_pack, uint16_unpack = _make_packer('>H')
uint8_pack, uint8_unpack = _make_packer('>B')
float_pack, float_unpack = _make_packer('>f')
double_pack, double_unpack = _make_packer('>d')

def decode_value(value_type, value):
    if value == None : return None
    
    if value_type[0] == "Int32Type":
        return int32_unpack(value)
    elif value_type[0] == "UTF8Type":
        return value.decode("utf8")
    elif value_type[0] == "BooleanType":
        return bool(int8_unpack(value))
    elif value_type[0] == "FloatType":
        return float_unpack(value)
    elif value_type[0] == "DoubleType":
        return double_unpack(value)
    elif value_type[0] == "LongType":
        return int64_unpack(value)
#     elif value_type == "IntegerType":
#         return varint_unpack(value)
    elif value_type[0] == "DateType":
        return int64_unpack(value) / 1000.0
    elif value_type[0] == "SetType":
        return get_collection(list, value_type[1], value)
    elif value_type[0] == "ListType":
#         return get_collection(list, value_type[1], value)
        return get_collection(tuple, value_type[1], value)
    elif value_type[0] == "MapType":
        return get_collection_map(value_type[1], value_type[2], value)
    return value



stringlit_re = re.compile(r"""('[^']*'|"[^"]*")""")
comments_re = re.compile(r'(/\*(?:[^*]|\*[^/])*\*/|//.*$|--.*$)', re.MULTILINE)
param_re = re.compile(r'''
    ( \W )            # don't match : at the beginning of the text (meaning it
                      # immediately follows a comment or string literal) or
                      # right after an identifier character
    : ( \w+ )
    (?= \W )          # and don't match a param that is immediately followed by
                      # a comment or string literal either
    ''', re.IGNORECASE | re.VERBOSE)

def cql_quote(term):
    def __escape_quotes(term):
        assert isinstance(term, basestring)
        return term.replace("'", "''")

    if isinstance(term, unicode):
        return "'%s'" % __escape_quotes(term.encode('utf8'))
    elif isinstance(term, (str, bool)):
        return "'%s'" % __escape_quotes(str(term))
    else:
        return str(term)
    
def replace_param_substitutions(query, replacer):
    split_strings = stringlit_re.split(' ' + query + ' ')
    split_str_and_cmt = []
    for p in split_strings:
        if p[:1] in '\'"':
            split_str_and_cmt.append(p)
        else:
            split_str_and_cmt.extend(comments_re.split(p))
    output = []
    for p in split_str_and_cmt:
        if p[:1] in '\'"' or p[:2] in ('--', '//', '/*'):
            output.append(p)
        else:
            output.append(param_re.sub(replacer, p))
    assert output[0][0] == ' ' and output[-1][-1] == ' '
    return ''.join(output)[1:-1]

def prepare_inline(query, params):
    """
    For every match of the form ":param_name", call cql_quote
    on kwargs['param_name'] and replace that section of the query
    with the result
    """
    def param_replacer(match):
        return match.group(1) + cql_quote(params[match.group(2)])
    return replace_param_substitutions(query, param_replacer)



class PycassaTableResultType():
    LIST = 1
    DICT = 2
    
    _VALUES_TO_NAMES = {
        1: "LIST",
        2: "DICT",
    }

    _NAMES_TO_VALUES = {
        "LIST": 1,
        "DICT": 2,
    }

    
class PycassaTable(object):
    def __init__(self, conn_pool, consistencyLv = ConsistencyLevel.QUORUM):
        self.conn_pool = conn_pool
        self.consistencyLv = consistencyLv
    
    def execute_cql(self, query, params = {}):
        self.schema = {}
        
        conn = self.conn_pool.get()
        self.query = prepare_inline(query, params)
        cql_result = conn.execute_cql3_query(self.query, Compression.NONE, self.consistencyLv)

#         self.schema 구조
#         일반 타입 - ["타입명"]
#         set, list - ["타입명", "서브타입명"]
#         map - ["타입명", "키 타입", "값 타입"]

        for col_name in cql_result.schema.value_types.keys():
            self.schema[col_name] = []
            temp_str = cql_result.schema.value_types[col_name].split("(")
            col_type = temp_str[0].split(".")
            self.schema[col_name].append(col_type[len(col_type) - 1])
            
            if self.schema[col_name][0] == "SetType" or self.schema[col_name][0] == "ListType":
                sub_type_value = temp_str[1].replace(")", "").split(".")
                sub_type_value = sub_type_value[len(sub_type_value) - 1]
                self.schema[col_name].append(sub_type_value)
            elif self.schema[col_name][0] == "MapType":
                sub_type_key = temp_str[1].replace(")", "").split(",")[0].split(".")
                sub_type_key = sub_type_key[len(sub_type_key) - 1]
                sub_type_value = temp_str[1].replace(")", "").split(",")[1].split(".")
                sub_type_value = sub_type_value[len(sub_type_value) - 1]
                self.schema[col_name].append(sub_type_key)
                self.schema[col_name].append(sub_type_value)
        
        self.conn_pool.put(conn) #mocheogil 님이 수정해주셨습니다.
        return cql_result
    
    
    def get_result(self, query, params = {}, return_type = PycassaTableResultType.LIST):
        result = []
        cql_result = self.execute_cql(query, params)
        
        if cql_result.type == CqlResultType.VOID:
            return None
        elif cql_result.type == CqlResultType.INT:
            return result.append(cql_result.num)
                                                
        for raw_row in cql_result.rows:
            if return_type == PycassaTableResultType.LIST:
                row = []
            elif return_type == PycassaTableResultType.DICT:
                row = {}
            
            for raw_col in raw_row.columns:
                if isinstance(row, list):
                    row.append(decode_value(self.schema[raw_col.name], raw_col.value))
                elif isinstance(row, dict):
                    row[raw_col.name] = (decode_value(self.schema[raw_col.name], raw_col.value))
            
            result.append(row)
            
        return result



혹시 누군가 더 좋은 코드로 발전 시키거나 더 좋은 코드를 가지고 계시다면 연락좀 부탁드립니다.

반응형