zoukankan      html  css  js  c++  java
  • libcassandra开发示例

    一、前言

    libcassandra是Cassandra官方推出的C/C++ API库。与thrift接口(另一个API库)相比,其接口更丰富,对类型匹配更细致。

    通过实践,私下也觉得libcassandra比thrift接口更好用。当然这只是个人观点。

    有关libcassandra的特点、安装等,在另一篇随笔里已有介绍,本篇把相应代码示例补上。

    二、环境

    创建keyspace、table如下:

    create keyspace hs WITH replication={'class':'SimpleStrategy', 'replication_factor':2};
    
    use hs;
    
    create columnfamily SAMPLE(
      ID        int,
      GID       bigint,
      DATA      blob,
      PRIMARY KEY(ID)
    );

    三、示例代码

    为简便起见,这里只写了Insert和select的示例,update和delete的示例与insert类似。

    #include "cassandra.h"
    #include <string.h>
    #include <stdio.h>
    
    class LibCassDemo
    {
    private:
         CassFuture     *m_pConnFuture;
         CassCluster    *m_pCluster;
         CassPrepared   *m_pInsertStmt = NULL;
         CassPrepared   *m_pSelectStmt = NULL;
    public:
         CassSession    *m_pSession;  //其实该成员变量也应是私有的,为偷懒改成公共的了
    
    public:
        LibCassDemo() {}
        ~LibCassDemo()
        {
            disconnect();
        }
    
        //连接Cassandra
        bool connect()
        {
           if (m_bConnect) return true;
           
           m_pCluster = cass_cluster_new();
           m_pSession = cass_session_new();
           cass_cluster_set_contact_points(m_pCluster, "localhost");
           cass_cluster_set_credentials(m_pCluster, "cassandra", "cassandra");
           m_pConnFuture = cass_session_connect(m_pSession, m_pCluster);
           if (cass_future_error_code(m_pConnFuture)!=CASS_OK)  //错误处理
           {
              const char* message;
              size_t message_length;
              cass_future_error_message(m_pConnFuture, &message, &message_length);
              fprintf(stderr, "Unable to connect: '%.*s'
    ", (int)message_length, message);
              return false;
           }
           printf("Connect Successful!
    ");
           m_bConnect = true;
           return m_bConnect;
        }
    
        //断开Cassandra
        void disconnect()
        {
           if (!m_bConnect) return;
           //关闭session并释放局部变量
           CassFuture *pCloseFuture = cass_session_close(m_pSession);
           cass_future_wait(pCloseFuture);
           cass_future_free(pCloseFuture);
           //释放所有成员变量
           if (m_pInsertStmt) cass_prepared_free(m_pInsertStmt);
           if (m_pSelectStmt) cass_prepared_free(m_pSelectStmt);
           cass_future_free(m_pConnFuture);
           cass_cluster_free(m_pCluster);
           cass_session_free(m_pSession);
           m_bConnect = false;
           printf("Disonnect Successful!
    ");
        }
    
        //相同的INSERT语句只须Prepare一次即可
        CassPrepared * buildInsertStmt()
        {
           string cql;
           if (m_pInsertStmt==NULL)
           {
              cql = "INSERT INTO SAMPLE (ID, GID, DATA) VALUES (?, ?, ?)";
              CassFuture *pPrepareFuture = cass_session_prepare(m_pSession, cql.c_str());
              if (cass_future_error_code(pPrepareFuture)!=CASS_OK)  //错误处理
              {
                 const char* message;
                 size_t message_length;
                 cass_future_error_message(pPrepareFuture, &message, &message_length);
                 fprintf(stderr, "Unable to prepare Insert Statement: '%.*s'
    ", (int)message_length, message);
              }
              else
                 m_pInsertStmt = (CassPrepared*)cass_future_get_prepared(pPrepareFuture);
              cass_future_free(pPrepareFuture);
           }
           return m_pInsertStmt;
        }
    
        //同样的,相同的SELECT语句也只须Prepare一次
        CassPrepared * buildSelectStmt()
        {
           string cql;
           if (m_pSelectStmt==NULL)
           {
              cql = "SELECT ID, GID, DATA FROM SAMPLE WHERE ID=? ";
              
              CassFuture *pPrepareFuture = cass_session_prepare(m_pSession, cql.c_str());
              if (cass_future_error_code(pPrepareFuture)!=CASS_OK)  //错误处理
              {
                 const char* message;
                 size_t message_length;
                 cass_future_error_message(pPrepareFuture, &message, &message_length);
                 fprintf(stderr, "Unable to prepare Select Statement: '%.*s'
    ", (int)message_length, message);
              }
              else
                 m_pSelectStmt = (CassPrepared*)cass_future_get_prepared(pPrepareFuture);
              cass_future_free(pPrepareFuture);
           }
           return m_pSelectStmt;
        }
    
       //获取int型数据
        int32_t getInt32FromRow(const CassRow *row, const uint16_t index)
        {
           int32_t i32;
           CassValue *value = (CassValue*)cass_row_get_column(row, index);
           cass_value_get_int32(value, &i32);
           return i32;
        }
    
        //获取bigint型数据
        int64_t getInt64FromRow(const CassRow* row, const uint16_t index)
        {
           int64_t i64;
           CassValue *value = (CassValue*)cass_row_get_column(row, index);
           cass_value_get_int64(value, &i64);
           return i64;
        }
    
        //获取blob型数据
        CassError getStringFromRow(const CassRow *row, const uint16_t index, UCHAR** data, int32_t &len)
        {
           const cass_byte_t* buf;
           size_t sz;
           CassValue *value = (CassValue*)cass_row_get_column(row, index);
           CassError cer = cass_value_get_bytes(value, &buf, &sz);
           if (cer==CASS_OK)
           {
              *data = (UCHAR *)malloc(sz +1);
              memcpy(*data, buf, sz);
              *(*data+sz+1)='';
              len = (int32_t)sz;
           }
           return cer;
       }
    
    }
    
    
    int main (int argc, char *argv[])
    {
       //初始化
       LibCassDemo *cass = new LibCassDemo();
       cass->connect();
       UCHAR primData[128];
       //Init primData here ...
    
       //写入数据
       CassPrepared *prepared = cass->buildInsertStmt();
       CassBatch *batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);  //批量方式
       for (int i=0; i<10; i++)
       {
          CassStatement * stmt = cass_prepared_bind(prepared);
          cass_statement_set_consistency(stmt, CASS_CONSISTENCY_QUORUM);
          cass_statement_bind_int32(stmt, 0, i);  //第一个是0
          cass_statement_bind_int64(stmt, 1, (i<<8 + i));
          cass_statement_bind_bytes(stmt, 2, primData, 128);
          cass_batch_add_statement(batch, stmt);
          cass_statement_free(stmt);
       }
         CassFuture *batchFuture = cass_session_execute_batch(cass->m_pSession, batch);
         if (cass_future_error_code(batchFuture)!=CASS_OK)  //错误处理
         {
            const char* message;
            size_t message_length;
            cass_future_error_message(batchFuture, &message, &message_length);
            fprintf(stderr, "Unable to execute BATCH: '%.*s'
    ", (int)message_length, message);
         }
         cass_future_free(batchFuture);
         cass_batch_free(batch);
    
       //读取数据
       prepared = cass->buildSelectStmt();
       stmt = cass_prepared_bind(prepared);
       int id=7;
       cass_statement_bind_int32(stmt, 0, id);
       CassFuture *readFuture = cass_session_execute(cass->m_pSession, stmt);
       CassResult *result = (CassResult*)cass_future_get_result(readFuture);  //从CassFuture获取查询结果
       CassIterator *iter = cass_iterator_from_result(result);  //转换为CassIterator
       while(cass_iterator_next(iter))
       {
          CassRow *row = (CassRow*)cass_iterator_get_row(iter);  //CassRow和CassError对象无须释放
          printf("ID=%d, GID=%ld, ", cass->getInt32FromRow(row, 0), cass->getInt64FromRow(row, 1));
          UCHAR *pData = (UCHAR *)malloc(128);
          int len;
          cass->getStringFromRow(row, 2, &pData, len);
          printf("DATA is :%s
    ", pData);
          free(pData);
       }
       cass_iterator_free(iter);
       cass_result_free(result);
       cass_future_free(readFuture);
       cass_statement_free(stmt);
    
       //释放资源
       delete cass;
    }
     
  • 相关阅读:
    ActiveReport换页的判断(当设置了repeatstyle为OnPage)
    创建与删除SQL约束或字段约束。 http://www.cnblogs.com/hanguoji/archive/2006/11/17/563871.html
    在SQL Server 2005中实现表的行列转换
    ActiveReport,Detail隐藏的问题
    SQL Server identity列的操作方法
    「預り」の意味
    POJ 1595 Prime Cuts
    Hdu Graph’s Cycle Component
    POJ 3250 Bad Hair Day
    Hdu 1548 A strange lift(BFS)
  • 原文地址:https://www.cnblogs.com/wggj/p/7227925.html
Copyright © 2011-2022 走看看