简介
Cassandra 是由 Facebook 开发的、用于大数据的、开源分布式的 NoSQL 数据库
无单点故障:Cassandra 节点按环形排列,没有中心节点,每个节点独立互联地扮演相同角色,每个节点都可以接受读写请求,数据可以有多个副本存储在多个节点,节点之间通过 Gossip (P2P) 协议交换状态信息,集群中有若干节点配为种子节点,用于新加入的节点获取集群拓扑结构并启动 Gossip 协议
提供类 SQL 语言 CQL,基本命令几乎和 SQL 一致
不支持 Join 和子查询
支持 group by 和聚合操作,支持 map、set、list 等数据类型
适合结构化、半结构化、非结构化数据
高度可扩展,允许添加硬件、节点以提高数据容量,同时保持快速的响应时间
通过 Consistency 命令可以配置一致性级别,主要是通知客户端操作前,必须确保的 replica 的成功数量
Cassandra 采用的是最终一致性,是 CAP 理论里的 AP
随便提一下另一个 NoSQL 数据库 HBase 是 CP,HBase 的可用性比 Cassandra 低,并且组件架构比较复杂,维护成本比较高
读写
CommitLog:磁盘文件,记录写操作,系统崩溃时丢失的内存数据可以从 CommitLog 恢复
MemTable:驻留内存的数据结构,数据将先被写入 MemTable
SSTable:磁盘文件,当 MemTable 内存达到阀值,或 CommitLog 达到阀值时,数据会被写入 SSTable
BloomFilter:用于减少不必要的 SSTable 的读取
可以通过 ./bin/nodetool flush 命令强制刷入 SSTable
表结构
Cassandra 的数据模型不是关系型,也不是列式存储,而是类似于多层键值对的结构
Cassandra 由 Keyspace (类似关系型数据库里的 database) 和 Table 组成
而 Table 逻辑上像是一个包含了多个 Partition Key,多个 Clustering Key,多个普通 Column 的,多层次的 Key-Value 结构,其中 Partition Key 用于分区,Cassandra 对 Partition Key 做 Hash 后,根据 Hash 找到对应 Node 的 Token,把数据存到相应的 Node 上,而 Clustering Key 则是分区内的排序索引,Partition Key 和 Clustering Key 合起来又叫 Primary Key
比如
CREATE TABLE my_table (
pkey1 int,
pkey2 int,
ckey1 int,
ckey2 int,
content text,
PRIMARY KEY ((pkey1, pkey2), ckey1, ckey2)
);
其中 pkey1 和 pkey2 是 Partition Key,而 ckey1 和 ckey2 是 Clustering Key,而 content 是普通的列
其结构类似于
Map<pkey1, Map<pkey2, SortedMap<ckey1, SortedMap<ckey2, content>>>>
可以看到这种结构,如果不先找出 pkey1 就无法找出 pkey2,必须先找到前面的 key 才能找到后面的
所以 Cassandra 的 Where 语句,需要指定所有的 Partition Key,比如
select * from my_table where pkey1 = 100;
select * from my_table where pkey2 = 200;
这样会报错,系统认为这样无法定位到分区,需要遍历所有分区的所有 partition key,会有性能问题,必须是
select * from my_table where pkey1 = 100 and pkey2 = 200;
而 Clustering Key 可以部分指定,但不能跳过前面的 Key 去查询后面的,比如
select * from my_table where pkey1 = 100 and pkey2 = 200 and ckey2 = 400;
这样会报错,因为 ckey2 的前面有 ckey1,必须先指定 ckey1,或者只指定 ckey1 也行
Partition Key 还无法用于范围查询,比如
select * from my_table where pkey1 > 100 and pkey2 < 200;
这样同样会报错,因为系统还是要遍历所有的 Partition Key
而如果是对 Clustering Key 做范围查询就可以,比如
select * from my_table where pkey1 = 100 and pkey2 = 200 and ckey1 > 300;
这些查询被阻止都是因为系统认为可能性能低,如果一定要这样查询是可以的,要加上 allow filtering
select * from my_table where pkey1 = 100 allow filtering;
select * from my_table where pkey2 = 200 allow filtering;
select * from my_table where pkey1 = 100 and pkey2 = 200 and ckey2 = 400 allow filtering;
select * from my_table where pkey1 > 100 and pkey2 < 200 allow filtering;
系统只是认为性能可能会差,不代表一定是差的,我们先看一下 Cassandra 表的存储
Partition Key 用于分区和排序,即按照 Partition Key 的 Hash Token 决定了数据被分配到哪个节点,并且在节点内也是按该 Hash Token 按序存储的,有相同 Partition Key 的数据会存在一起,并且按照 Clustering Key 排序存储,就是说 Cassandra 是先按 Partition Key 的 Token 分区并排序存储,内部再按 Clustering Key 排序存储,再存储普通 Column 的值 (Column 值不排序)
考虑一下,如果一张表的 Partition Key 是 ID,而 Clustering Key 是 timestamp,如果 ID 就 1000 个,而 timestamp 存了一个月的数据,每分钟一个 timestamp,每个 ID 一个月共 43200 个数据,全表共 43200000 个数据,现在要查询某一小时的所有 ID 的数据,这时 Where 后面只查询 timestamp 然后通过 allow filtering 强行查询,这样也只是遍历了 1000 个 ID,每个 ID 再查 60 条 timestamp,而这 60 条是排序好的,可以很快,所以并不会遍历全表,虽然有 4000 多万数据,但性能不会差
Token
Token 是 Cassandra 用来平衡个分区负载的一个属性
Cassandra 会将数据的 Partition Key 进行 HASH,然后将得到的结果和各节点的 Token 比较,根据以下规则来决定存储到哪个节点上:
- 比 Hash 值大并且离 Hash 值最近的 Token 的节点上
- 如果 Hash 值比最大的 Token 都大,那就存到最小 Token 的节点上
初始化时,可以给每个节点配置 Token,也可以让 Cassandra 自动配置,系统会尽量让每个 Token 容纳的数据量在各个节点间能比较均衡
默认的分区策略是 RandomPartitioner
前面提到 Partition Key 不加 allow filtering 不能排序,其实还有另一种方法,就是把 Partition Key 变成 Token 就可以,比如
select * from my_table where token(pkey1, pkey2) > token(100, 0) and token(pkey1, pkey2) < token(300, 200);
但这种做法可能无法得到想要的结果,因为 token 转换后的大小可能和原值不一样,比如可能会有 token(-1) > token(0) 的情况
安装
https://cassandra.apache.org/doc/latest/getting_started/installing.html
可以下载包 tar 包安装,也可以使用 YUM,APT 安装,以下载 tar 包为例
wget https://mirrors.bfsu.edu.cn/apache/cassandra/3.11.10/apache-cassandra-3.11.10-bin.tar.gz
tar zxvf apache-cassandra-3.11.10-bin.tar.gz
cd apache-cassandra-3.11.10/bin
./cassandra ## 后台运行
./cassandra -f ## 前台运行
./nodetool
./nodetool stopdaemon
其他目录
./conf/cassandra.yaml ## 主配置文件
./logs ## 日志
./data ## 数据,包括 cache,commit log 等
cassandra.yaml 部分配置
cluster_name ## 集群名字
seeds ## 种子节点的 IP,用逗号分开
## (集群中选择若干节点作为种子节点,所有的节点都要指定种子节点,并通过种子节点交流状态信息)
storage_port ## 和其他节点交互的端口
listen_address ## IP 地址
native_transport_port ## 和 client 比如 cqlsh 交互的端口
data_file_directories ## 数据存储目录
commitlog_directory ## commit log 存储目录
saved_caches_directory ## cache 存储目录
hints_directory ## hints 存储目录
可以用 cqlsh 做交互操作
基本操作
$ ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.10 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>
cqlsh>
cqlsh> show host;
Connected to Test Cluster at 127.0.0.1:9042.
cqlsh>
cqlsh>
cqlsh> show version;
[cqlsh 5.0.1 | Cassandra 3.11.10 | CQL spec 3.4.4 | Native protocol v4]
cqlsh>
cqlsh>
cqlsh> CREATE KEYSPACE my_key_space WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};
cqlsh>
cqlsh>
cqlsh> use my_key_space;
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> CREATE TABLE my_table (
... pkey1 int,
... pkey2 int,
... ckey1 int,
... ckey2 int,
... content text,
... PRIMARY KEY ((pkey1, pkey2), ckey1, ckey2)
... );
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> DESCRIBE tables;
my_table
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> DESCRIBE my_table;
CREATE TABLE my_key_space.my_table (
pkey1 int,
pkey2 int,
ckey1 int,
ckey2 int,
content text,
PRIMARY KEY ((pkey1, pkey2), ckey1, ckey2)
) WITH CLUSTERING ORDER BY (ckey1 ASC, ckey2 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> INSERT INTO my_table (pkey1, pkey2, ckey1, ckey2, content) VALUES (11, 12, 13, 14, 'a');
cqlsh:my_key_space> INSERT INTO my_table (pkey1, pkey2, ckey1, ckey2, content) VALUES (21, 22, 23, 24, 'b');
cqlsh:my_key_space> INSERT INTO my_table (pkey1, pkey2, ckey1, ckey2, content) VALUES (31, 32, 33, 34, 'c');
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* WHERE 必须指定所有 PRIMARY KEY */
cqlsh:my_key_space> UPDATE my_table SET content = 'd' where pkey1 = 31 and pkey2 = 32 and ckey1 = 33 and ckey2 = 34;
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> select * from my_table;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
11 | 12 | 13 | 14 | a
31 | 32 | 33 | 34 | d
21 | 22 | 23 | 24 | b
(3 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* WHERE 必须指定所有 Partition Key (就是 pkey1 和 pkey2,而 ckey1 和 ckey2 是 Clustering Key) */
cqlsh:my_key_space> select * from my_table where pkey1 = 31 and pkey2 = 32;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
31 | 32 | 33 | 34 | d
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 除非添加 ALLOW FILTERING 但这会影响性能,因为需要扫描所有 Partition Key */
cqlsh:my_key_space> select * from my_table where pkey1 = 31 ALLOW FILTERING;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
31 | 32 | 33 | 34 | d
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 不能跳过 ckey1 查询 ckey2,也不能跳过 partition key 查询 clustering key */
cqlsh:my_key_space> select * from my_table where pkey1 = 31 and pkey2 = 32 and ckey1 = 33 and ckey2 = 34;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
31 | 32 | 33 | 34 | d
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 除非添加 ALLOW FILTERING 但这会影响性能,因为需要扫描所有 ckey1 */
cqlsh:my_key_space> select * from my_table where pkey1 = 31 and pkey2 = 32 and ckey2 = 34 ALLOW FILTERING;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
31 | 32 | 33 | 34 | d
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 不能对 partition key 做范围查询,除非添加 ALLOW FILTERING,但会影响性能,而 clustering key 则可以 */
cqlsh:my_key_space> select * from my_table where pkey1 = 11 and pkey2 > 10 ALLOW FILTERING;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
11 | 12 | 13 | 14 | null
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 创建索引,然后可以直接对索引列查询了 */
cqlsh:my_key_space> CREATE INDEX my_index_ckey2_on_my_table ON my_table (ckey2);
cqlsh:my_key_space>
cqlsh:my_key_space> select * from my_table where ckey2 = 14;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
11 | 12 | 13 | 14 | null
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 使用 list、map、set 数据类型 */
cqlsh:my_key_space> CREATE TABLE data(name text PRIMARY KEY, email list<text>, phone set<varint>, address map<text, text>);
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> INSERT INTO data(name, email) VALUES ('LIN', ['lin@gmail.com', 'lin@yahoo.com']);
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> INSERT INTO data(name, phone) VALUES ('LIN', {12345678, 87654321});
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> INSERT INTO data(name, address) VALUES ('LIN', {'home':'A' , 'office':'B' } );
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> select * from data;
name | address | email | phone
------+------------------------------+------------------------------------+----------------------
LIN | {'home': 'A', 'office': 'B'} | ['lin@gmail.com', 'lin@yahoo.com'] | {12345678, 87654321}
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 自定义数据类型 */
cqlsh:my_key_space> CREATE TYPE card_details (
... num int,
... pin int,
... name text,
... cvv int,
... phone varint
... );
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> CREATE TABLE data2(name text PRIMARY KEY, card frozen<card_details>); /* 使用自定义类型最好加上 frozen */
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> INSERT INTO data2(name, card) VALUES ('LIN', {num:123456, pin:36, name:'LIN', cvv:123, phone:13912345678} );
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> SELECT * FROM data2;
name | card
------+-------------------------------------------------------------------
LIN | {num: 123456, pin: 36, name: 'LIN', cvv: 123, phone: 13912345678}
(1 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 批处理 */
cqlsh:my_key_space> BEGIN BATCH
... INSERT INTO my_table (pkey1, pkey2, ckey1, ckey2, content) VALUES (51, 52, 53, 54, 'e');
... UPDATE my_table SET content = 'c' where pkey1 = 31 and pkey2 = 32 and ckey1 = 33 and ckey2 = 34;
... DELETE content FROM my_table WHERE pkey1 = 11 and pkey2 = 12 and ckey1 = 13 and ckey2 = 14;
... APPLY BATCH;
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> select * from my_table;
pkey1 | pkey2 | ckey1 | ckey2 | content
-------+-------+-------+-------+---------
11 | 12 | 13 | 14 | null
51 | 52 | 53 | 54 | e
31 | 32 | 33 | 34 | c
21 | 22 | 23 | 24 | b
(4 rows)
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> /* 查看和配置一致性等级 */
cqlsh:my_key_space> Consistency
Current consistency level is ONE.
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> Consistency TWO
Consistency level set to TWO.
cqlsh:my_key_space>
cqlsh:my_key_space>
cqlsh:my_key_space> Consistency
Current consistency level is TWO.
cqlsh:my_key_space>
主要是 key 必须按序指定,不能跳过前面的 key,并且 partition key 不知道范围查询,虽然说是为了保证效率,但还是比较其他数据库怪
python
安装
pip3 install cassandra-driver
代码
from cassandra.cluster import Cluster
cluster = Cluster(['172.17.0.1'], port=9042)
session = cluster.connect('my_key_space')
cql='''
select
*
from
my_table
where
pkey1 = 11 and
pkey2 > 10
ALLOW FILTERING
'''
result = session.execute(cql)
for i in result:
print(i)
cql 很接近 sql,虽然功能比较弱一些
Spark
启动
./pyspark
--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.2
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
--conf spark.cassandra.connection.host=localhost
代码
df = spark.read.format("org.apache.spark.sql.cassandra")
.options(table="my_table", keyspace="my_key_space")
.load()
df.write.format("org.apache.spark.sql.cassandra")
.options(keyspace='my_key_space', table='my_table')
.mode('append')
.save()