关于什么是cassandra,可以参考:
http://blog.csdn.net/zyz511919766/article/details/38683219
比较了HBASE、mongodb 和 cassandra
1)HBASE 和 cassandra 都是列式存储,但是 cassandra部署方便,扩展容易
2) mongodb 并不是真正的列式存储,数据扩容比较麻烦,需要提前做好集群分区
casandra是 p2p(gossip)实现的bigtable, 数据一致性可以通过参数配置(R+W >N), 写操作完成是all node,还是指定的node个数,才进行返回。
数据模型:
尝试了cassandra的两个client。
1. "github.com/gocql/gocql"
2."github.com/hailocab/gocassa"
gocassa是在gocql上面的封装,提供更方便的操作。
在用cassandra之前,建议先熟悉一下CQL,类似 SQL语句的语法。
作为一个client, 我们需要考虑的点:
1)连接池
2)批量操作
3)可能还会考虑同步操作(同时更新两个table中的数据)
cassandra部署和使用都还算简单,比较困难的是,要摆脱传统的db设计范式思维,要根据后续的数据查询来设计你的bigtable结构,方便将来的查询。
贴上几个相关的参考资料:
http://www.slideshare.net/yukim/cql3-in-depth (CQL相关介绍)
http://www.slideshare.net/jaykumarpatel/cassandra-data-modeling-best-practices
http://www.infoq.com/cn/articles/best-practices-cassandra-data-model-design-part2 (ebay的cassandra实践)
然后,贴上两个client使用示例:
package main import ( "fmt" "log" "time" "github.com/gocql/gocql" ) func main() { // connect to the cluster cluster := gocql.NewCluster("127.0.0.1") cluster.Keyspace = "demo" cluster.Consistency = gocql.Quorum //设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接) cluster.NumConns = 3 session, _ := cluster.CreateSession() time.Sleep(1 * time.Second) //Sleep so the fillPool can complete. fmt.Println(session.Pool.Size()) defer session.Close() //unlogged batch, 进行批量插入,最好是partition key 一致的情况 t := time.Now() batch := session.NewBatch(gocql.UnloggedBatch) for i := 0; i < 100; i++ { batch.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, fmt.Sprintf("name_%d", i), fmt.Sprintf("ip_%d", i)) } if err := session.ExecuteBatch(batch); err != nil { fmt.Println("execute batch:", err) } bt := time.Now().Sub(t).Nanoseconds() t = time.Now() for i := 0; i < 100; i++ { session.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, fmt.Sprintf("name_%d", i), fmt.Sprintf("ip_%d", i)) } nt := time.Now().Sub(t).Nanoseconds() t = time.Now() sbatch := session.NewBatch(gocql.UnloggedBatch) for i := 0; i < 100; i++ { sbatch.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, "samerow", fmt.Sprintf("ip_%d", i)) } if err := session.ExecuteBatch(sbatch); err != nil { fmt.Println("execute batch:", err) } sbt := time.Now().Sub(t).Nanoseconds() fmt.Println("bt:", bt, "sbt:", sbt, "nt:", nt) //----------out put------------------ // ./rawtest // bt: 5795593 sbt: 3003774 nt: 261775 //------------------------------------ // insert a tweet if err := session.Query(`INSERT INTO tweet (timeline, id, text) VALUES (?, ?, ?)`, "me", gocql.TimeUUID(), "hello world").Exec(); err != nil { log.Fatal(err) } var id gocql.UUID var text string /* Search for a specific set of records whose 'timeline' column matches * the value 'me'. The secondary index that we created earlier will be * used for optimizing the search */ if err := session.Query(`SELECT id, text FROM tweet WHERE timeline = ? LIMIT 1`, "me").Consistency(gocql.One).Scan(&id, &text); err != nil { log.Fatal(err) } fmt.Println("Tweet:", id, text) // list all tweets iter := session.Query(`SELECT id, text FROM tweet WHERE timeline = ?`, "me").Iter() for iter.Scan(&id, &text) { fmt.Println("Tweet:", id, text) } if err := iter.Close(); err != nil { log.Fatal(err) } query := session.Query(`SELECT * FROM bigrow where rowname = ?`, "30") // query := session.Query(`SELECT * FROM bigrow `) var m map[string]interface{} m = make(map[string]interface{}, 10) err := query.Consistency(gocql.One).MapScan(m) if err != nil { log.Fatal(err) } fmt.Printf("%#v", m) }
package main import ( "fmt" "time" "github.com/hailocab/gocassa" ) // This test assumes that cassandra is running on default port locally and // that the keySpace called 'test' already exists. type Sale struct { Id string CustomerId string SellerId string Price int Created time.Time } func main() { keySpace, err := gocassa.ConnectToKeySpace("not_exist_demo", []string{"127.0.0.1"}, "", "") if err != nil { panic(err) } salesTable := keySpace.Table("sale", Sale{}, gocassa.Keys{ PartitionKeys: []string{"Id"}, }) // Create the table - we ignore error intentionally err = salesTable.Create() fmt.Println(err) // We insert the first record into our table - yay! err = salesTable.Set(Sale{ Id: "sale-1", CustomerId: "customer-1", SellerId: "seller-1", Price: 42, Created: time.Now(), }).Run() if err != nil { panic(err) } result := Sale{} if err := salesTable.Where(gocassa.Eq("Id", "sale-1")).ReadOne(&result).Run(); err != nil { panic(err) } fmt.Println(result) }
更多配置可参考:
https://github.com/gocql/gocql/blob/master/cluster.go#L57
// ClusterConfig is a struct to configure the default cluster implementation // of gocoql. It has a varity of attributes that can be used to modify the // behavior to fit the most common use cases. Applications that requre a // different setup must implement their own cluster. type ClusterConfig struct { Hosts []string // addresses for the initial connections CQLVersion string // CQL version (default: 3.0.0) ProtoVersion int // version of the native protocol (default: 2) Timeout time.Duration // connection timeout (default: 600ms) Port int // port (default: 9042) Keyspace string // initial keyspace (optional) NumConns int // number of connections per host (default: 2) NumStreams int // number of streams per connection (default: max per protocol, either 128 or 32768) Consistency Consistency // default consistency level (default: Quorum) Compressor Compressor // compression algorithm (default: nil) Authenticator Authenticator // authenticator (default: nil) RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0) SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0) ConnPoolType NewPoolFunc // The function used to create the connection pool for the session (default: NewSimplePool) DiscoverHosts bool // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false) MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000) MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000) PageSize int // Default page size to use for created sessions (default: 5000) SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset) Discovery DiscoveryConfig SslOpts *SslOptions DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above) }
连接池,默认采用是NewSimplePool,这里用的是roud robin
源码:https://github.com/gocql/gocql/blob/master/connectionpool.go#L454
ConnPoolType 是一个接口,可以自己实现接口来定制话自己的策略。
数据一致性策略,通过Consistency配置,默认是Quorum(大部分节点):
type Consistency uint16 const ( Any Consistency = 0x00 One Consistency = 0x01 Two Consistency = 0x02 Three Consistency = 0x03 Quorum Consistency = 0x04 All Consistency = 0x05 LocalQuorum Consistency = 0x06 EachQuorum Consistency = 0x07 LocalOne Consistency = 0x0A )