Cassandra使用pycassa批量导入数据

简介:

本周接手了一个Cassandra系统的维护工作,有一项是需要将应用方的数据导入我们维护的Cassandra集群,并且为应用方提供HTTP的方式访问服务。这是我第一次接触KV系统,原来只是走马观花似的看过KV啊,NoSQL啊。但是实际上没有实际的使用经验。经过两天的学习和接手,终于搞明白了在生产环境中的使用方式。在此简要的笔记一下。本文主要包括的内容有:

Cassandra的简介,

Cassandra的相关CLI

Cassandra的Python API,并且给出一个批量导入数据的例子。


1. Cassandra简介

Cassandra的主要特点就是它不是一个数据库,而是由一堆数据库节点共同构成的一个分布式网络服务,对Cassandra 的一个写操作,会被复制到其他节点上去,对Cassandra的读操作,也会被路由到某个节点上面去读取。对于一个Cassandra群集来说,扩展性能 是比较简单的事情,只管在群集里面添加节点就可以了。

Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比 Dynomite(分布式的Key-Value存 储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库 的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型。)Cassandra最初由Facebook开发,后转变成了开源项目。它是一个网络社交云计算方面理想的数据库。以Amazon专有的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型。P2P去中心化的存储。很多方面都可以称之为Dynamo 2.0。

和其他数据库比较,有几个突出特点:

  1. 模式灵活 :使用Cassandra,像文档存储,你不必提前解决记录中的字段。你可以在系统运行时随意的添加或移除字段。这是一个惊人的效率提升,特别是在大型部 署上。
  2. 真正的可扩展性 :Cassandra是纯粹意义上的水平扩展。为给集群添加更多容量,可以指向另一台电脑。你不必重启任何进程,改变应用查询,或手动迁移任何数据。
  3. 多数据中心识别 :你可以调整你的节点布局来避免某一个数据中心起火,一个备用的数据中心将至少有每条记录的完全复制。

一些使Cassandra提高竞争力的其他功能:

  1. 范围查询 :如果你不喜欢全部的键值查询,则可以设置键的范围来查询。
  2. 列表数据结构 :在混合模式可以将超级列添加到5维。对于每个用户的索引,这是非常方便的。
  3. 分布式写操作 :可以在任何地方任何时间集中读或写任何数据。并且不会有任何单点失败。

2. 基础命令

连接

./cassandra-cli-h 10.224.52.73 -port 9160

集群式自动负载的,因此连接任意一个节点即可。

Check schema

show schema;

在创建了schema或者列族后,可以使用时命令确认是否成功

在运行改命令前,需要使用命令use keyspace_name; 否则会遇到以下错误:

Not authorized to a working keyspace

 

list

list column_family_name;

可以显示列族的前100列。


3. 批量导入

实验数据来自搜狗实验室的中文词语搭配库,http://www.sogou.com/labs/dl/r.html

数据格式如下:

词语1_词语2 \t 两个词共同出现的次数

在这里并不讨论该数据的具体意义,只是以这个数据为起点来说明如何向应用方提供服务。

部分实际数据:

  都要_打牌>--4                       

  等候_一次>--26 

   本刊_重要>--3                                                                                                                                                                  关系_全方位>14                                                                                                                                                                加热_迅速>--107     

设计列族名为 test_only, cli 如下:

create column family test_only

 with column_type = 'Standard'

  andcomparator = 'UTF8Type'

  anddefault_validation_class = 'BytesType'

  andkey_validation_class = 'UTF8Type'

  andread_repair_chance = 0.1

  anddclocal_read_repair_chance = 0.0

  andgc_grace = 864000

  andmin_compaction_threshold = 4

  andmax_compaction_threshold = 32

  andreplicate_on_write = true

  andcompaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'

  andcaching = 'KEYS_ONLY'

  and column_metadata = [

    {column_name : 'count',

    validation_class : UTF8Type}]

  andcompression_options = {'sstable_compression' :'org.apache.cassandra.io.compress.SnappyCompressor'};

 

连接到Cassandra:pycassa.ConnectionPool(‘keyspace_name’, server_list)

具体到我们的例子就是:

con = pycassa.ConnectionPool('History',server_list=["server1:9160", "server2:9160","server3:9160"])

 
获取列族:

cf = pycassa.ColumnFamily(con, cfName)


插入一条数据:

cf.insert('row_key', {'col_name': 'col_val'})  


批量插入:

cf.batch_insert({'row1': {'name1': 'val1', 'name2': 'val2'},                                           'row2': {'foo': 'bar'}})  


获取一条数据:

cf.get(‘row_key’)


获取某一列的值:

cf.get(‘row_key’)[‘column_name’]

下面是具体的代码实现:

import pycassa                                                                                                                                                                                                                              
import time                                                                                                                                                                                                                                 
batch_size = 100                                                                                                                                                                                                                            
def pycassa_connect():                                                                                                                                                                                                                      
                                                                                                                                                                                                                                            
    #start = time.time()                                                                                                                                                                                                                    
    return pycassa.ConnectionPool('History', server_list=["192.168.1.20:9160"])                                                                                                                 
    #end = time.time()                                                                                                                                                                                                                      
    #print "Mola init time: ", (end - start)                                                                                                                                                                                                
                                                                                                                                                                                                                                            
def batch_insert(file_path, cf):                                                                                                                                                                                                            
    global batch_size                                                                                                                                                                                                                       
    f = open(file_path, "r")                                                                                                                                                                                                                
    count=0                                                                                                                                                                                                                                 
    error_count = 0                                                                                                                                                                                                                         
    kvmap = {}                                                                                                                                                                                                                              
    for line in f:--                                                                                                                                                                                                                        
        list = line.split("\t")                                                                                                                                                                                                             
        if len(list) != 2 :                                                                                                                                                                                                                 
            print "skip error data"                                                                                                                                                                                                         
            continue                                                                                                                                                                                                                        
        column = {}                                                                                                                                                                                                                         
        column['count'] = list[1].replace('\n', '')                                                                                                                                                                                         
        try:                                                                                                                                                                                                                                
            kvmap[list[0].decode('gb2312').encode('utf-8')] = column-                                                                                                                                                                       
            if len(kvmap) % batch_size == 0:                                                                                                                                                                                                
                cf.batch_insert(kvmap)                                                                                                                                                                                                      
                kvmap.clear()                                                                                                                                                                                                               
            count = count + 1                                                                                                                                                                                                               
        except Exception, ex:                                                                                                                                                                                                               
            print "found execption"                                                                                                                                                                                                         
            print ex                                                                                                                                                                                                                        
            error_count = error_count + 1                                                                                                                                                                                                   
    f.close()                                                                                                                                                                                                                               
    if len(kvmap) > 0 :                                                                                                                                                                                                                     
        cf.batch_insert(kvmap)                                                                                                                                                                                                              
----                                                                                                                                                                                                                                        
    for key in kvmap:                                                                                                                                                                                                                       
        print "key is %s, value is %s"%(key, kvmap[key])-                                                                                                                                                                                   
    print "total insert data is %d, error is %d"%(count, error_count)

如何测试数据是正确的?

def test_after_insert(file_path, cf):                                                                                                                                                                                                       
    f = open(file_path, "r")                                                                                                                                                                                                                
    error_count=0                                                                                                                                                                                                                           
    print "Test started"                                                                                                                                                                                                                    
    for line in f:--                                                                                                                                                                                                                        
        list = line.split("\t")                                                                                                                                                                                                             
        if len(list) != 2 :                                                                                                                                                                                                                 
            print "skip error data"                                                                                                                                                                                                         
            continue                                                                                                                                                                                                                        
        count = list[1].replace('\n', '')                                                                                                                                                                                                   
        if cf.get(list[0].decode('gb2312').encode('utf-8'))['count'] != count:                                                                                                                                                              
            print "Key %s doesn't match value %s"%(list[0].decode('gb2312').encode('utf-8'), count)                                                                                                                                         
            error_count = error_count + 1                                                                                                                                                                                                   
    print "Test completed, found %d error(s)."%error_count                                                                                                                                                                                  
    f.close()  


目录
相关文章
|
30天前
|
存储 数据库
在TiDB中查询数据
【2月更文挑战第29天】在TiDB中查询数据涉及基本语法如SELECT,条件查询(WHERE子句),排序(ORDER BY),分组(GROUP BY)和聚合函数(COUNT, SUM等)。LIMIT与OFFSET用于分页,子查询和连接查询处理复杂场景。注意列名和表名准确性,优化查询以提升性能,谨慎使用更新或删除操作。
|
存储 缓存 分布式计算
大数据开发笔记(十):Hbase列存储数据库总结
HBase 本质上是一个数据模型,可以提供快速随机访问海量结构化数据。利用 Hadoop 的文件系统(HDFS)提供的容错能 力。它是 Hadoop 的生态系统,使用 HBase 在 HDFS 读取消费/随机访问数据,是 Hadoop 文件系统的一部分。
889 0
大数据开发笔记(十):Hbase列存储数据库总结
|
存储 算法 数据库
HBase在时间序列数据库中的应用
2017云栖大会HBase专场,阿里巴巴高级技术专家悠你带来题为HBase在时间序列数据库中的应用的演讲。本文主要从时序数据和数据库说起,着重分享了HiTSDB针对时序场景的优化,最后分析了HBase作为底层存储的优势。
7315 0
|
11天前
|
JSON NoSQL MongoDB
mongodb导出聚合查询的数据
mongodb导出聚合查询的数据
|
4月前
|
缓存 分布式计算 NoSQL
分布式NoSQL列存储数据库Hbase_MR集成Hbase:读写Hbase规则(九)
分布式NoSQL列存储数据库Hbase_MR集成Hbase:读写Hbase规则(九)
38 0
|
6月前
|
Java 关系型数据库 MySQL
21Solr批量导入数据
21Solr批量导入数据
23 0
|
关系型数据库 MySQL Java
Solr从myslq批量导入数据
Solr从myslq批量导入数据
64 0
Solr从myslq批量导入数据
|
JavaScript 前端开发 NoSQL
初识Mongdb之数据查询篇(三)
初识Mongdb之数据查询篇(三)
72 0
初识Mongdb之数据查询篇(三)
|
JavaScript 前端开发 关系型数据库
初识Mongdb之数据查询篇(二)
初识Mongdb之数据查询篇(二)
117 0
初识Mongdb之数据查询篇(二)
|
消息中间件 分布式计算 关系型数据库
阿里云 E-MapReduce ClickHouse 操作指南 04 期 — 数据导入
阿里云 E-MapReduce(简称 EMR )是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。
阿里云 E-MapReduce ClickHouse 操作指南 04 期 — 数据导入