一、前言
libcassandra是Cassandra官方推出的C/C++ API库。与thrift接口(另一个API库)相比,其接口更丰富,对类型匹配更细致。
通过实践,私下也觉得libcassandra比thrift接口更好用。当然这只是个人观点。
有关libcassandra的特点、安装等,在另一篇随笔里已有介绍,本篇把相应代码示例补上。
二、环境
创建keyspace、table如下:
create keyspace hs WITH replication={'class':'SimpleStrategy', 'replication_factor':2}; use hs; create columnfamily SAMPLE( ID int, GID bigint, DATA blob, PRIMARY KEY(ID) );
三、示例代码
为简便起见,这里只写了Insert和select的示例,update和delete的示例与insert类似。
#include "cassandra.h" #include <string.h> #include <stdio.h> class LibCassDemo { private: CassFuture *m_pConnFuture; CassCluster *m_pCluster; CassPrepared *m_pInsertStmt = NULL; CassPrepared *m_pSelectStmt = NULL; public: CassSession *m_pSession; //其实该成员变量也应是私有的,为偷懒改成公共的了 public: LibCassDemo() {} ~LibCassDemo() { disconnect(); } //连接Cassandra bool connect() { if (m_bConnect) return true; m_pCluster = cass_cluster_new(); m_pSession = cass_session_new(); cass_cluster_set_contact_points(m_pCluster, "localhost"); cass_cluster_set_credentials(m_pCluster, "cassandra", "cassandra"); m_pConnFuture = cass_session_connect(m_pSession, m_pCluster); if (cass_future_error_code(m_pConnFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(m_pConnFuture, &message, &message_length); fprintf(stderr, "Unable to connect: '%.*s' ", (int)message_length, message); return false; } printf("Connect Successful! "); m_bConnect = true; return m_bConnect; } //断开Cassandra void disconnect() { if (!m_bConnect) return; //关闭session并释放局部变量 CassFuture *pCloseFuture = cass_session_close(m_pSession); cass_future_wait(pCloseFuture); cass_future_free(pCloseFuture); //释放所有成员变量 if (m_pInsertStmt) cass_prepared_free(m_pInsertStmt); if (m_pSelectStmt) cass_prepared_free(m_pSelectStmt); cass_future_free(m_pConnFuture); cass_cluster_free(m_pCluster); cass_session_free(m_pSession); m_bConnect = false; printf("Disonnect Successful! "); } //相同的INSERT语句只须Prepare一次即可 CassPrepared * buildInsertStmt() { string cql; if (m_pInsertStmt==NULL) { cql = "INSERT INTO SAMPLE (ID, GID, DATA) VALUES (?, ?, ?)"; CassFuture *pPrepareFuture = cass_session_prepare(m_pSession, cql.c_str()); if (cass_future_error_code(pPrepareFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(pPrepareFuture, &message, &message_length); fprintf(stderr, "Unable to prepare Insert Statement: '%.*s' ", (int)message_length, message); } else m_pInsertStmt = (CassPrepared*)cass_future_get_prepared(pPrepareFuture); cass_future_free(pPrepareFuture); } return m_pInsertStmt; } //同样的,相同的SELECT语句也只须Prepare一次 CassPrepared * buildSelectStmt() { string cql; if (m_pSelectStmt==NULL) { cql = "SELECT ID, GID, DATA FROM SAMPLE WHERE ID=? "; CassFuture *pPrepareFuture = cass_session_prepare(m_pSession, cql.c_str()); if (cass_future_error_code(pPrepareFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(pPrepareFuture, &message, &message_length); fprintf(stderr, "Unable to prepare Select Statement: '%.*s' ", (int)message_length, message); } else m_pSelectStmt = (CassPrepared*)cass_future_get_prepared(pPrepareFuture); cass_future_free(pPrepareFuture); } return m_pSelectStmt; } //获取int型数据 int32_t getInt32FromRow(const CassRow *row, const uint16_t index) { int32_t i32; CassValue *value = (CassValue*)cass_row_get_column(row, index); cass_value_get_int32(value, &i32); return i32; } //获取bigint型数据 int64_t getInt64FromRow(const CassRow* row, const uint16_t index) { int64_t i64; CassValue *value = (CassValue*)cass_row_get_column(row, index); cass_value_get_int64(value, &i64); return i64; } //获取blob型数据 CassError getStringFromRow(const CassRow *row, const uint16_t index, UCHAR** data, int32_t &len) { const cass_byte_t* buf; size_t sz; CassValue *value = (CassValue*)cass_row_get_column(row, index); CassError cer = cass_value_get_bytes(value, &buf, &sz); if (cer==CASS_OK) { *data = (UCHAR *)malloc(sz +1); memcpy(*data, buf, sz); *(*data+sz+1)='