您的位置:首页 > 数据库

NoSQL_教程三_Cassandra_用Java代码操作 Cassandra

2016-09-12 20:42 435 查看
Cassandra 作为一门优秀的非关系型数据库, 用途比较广泛。

本文不是一篇Cassandra的介绍性文章,主要讲解了如何利用Java代码操作Cassandra数据库, 如果对Cassandra还不是很了解,可以参考前面的两篇教程。

与 Cassandra交互的常用的类:

Cluster类:操作集群,控制连接节点和一些属性,项目中只需要定义一个

Session类:执行CQL语句,项目中只需要定义一个

ResultSet类:每次同步执行CQL都会返回这个类

ResultSetFuture类:每次异步执行CQL都会返回这个类

PreparedStatement类:可以预定义CQL

Statement类:定义CQL,可以指定查询属性,如fetchSize

Row类:查询的结果中的一行数据

QueryBuilder类:可以动态构造CQL中的Select、Insert、Update、Delete

Cluster

Cluster cluster = Cluster.builder()

.addContactPoints("127.0.0.1", "127.0.0.2")

.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)

.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))

.build();

cluster.getConfiguration()

.getProtocolOptions()

.setCompression(ProtocolOptions.Compression.LZ4);

Cluster 四种Retry重试策略:(对数据库的操作失败后的重试)

DefaultRetryPolicy

没有查询到数据,或者写入时超时的情况下进行重新查询

DowngradingConsistencyRetryPolicy

与DefaultRetryPolicy一样,不同点是考虑了最终数据一致性问题

FallthroughRetryPolicy

这一策略不重试查询,但允许客户端业务逻辑实现重试

LoggingRetryPolicy

不重试查询,用来记录日志信息,info级别

Cluster 两种节点重连策略:(节点连接不上重新连接)

ConstantReconnectionPolicy

固定时间间隔进行重连

ExponentialReconnectionPolicy

指数级别增加重连时间,但不会超过最大重连时间

SQL

session执行CQL查询的两种模式:

同步执行

ResultSet rs = session.execute("SELECT * FROM KEYSPACE1.CF1");

异步执行

ResultSetFuture rs = session.executeAsync("SELECT * FROM KEYSPACE1.CF1");

同步执行与异步执行结果的获取

ResultSet类代表执行CQL的结果信息,如果是查询操作,可以用如下方法获取所有数据:

List<Row> rowsList = rs.all();

ResultSetFuture是以异步非阻塞方式获取数据,可以通过如下方法获取ResultSet对象:

ResultSet rs = rsf.getUninterruptibly();

预定义CQL语句

PreparedStatement statement = getSession().prepare(

"INSERT INTO simplex.songs " +

"(id, title, album, artist, tags) " +

"VALUES (?, ?, ?, ?, ?);");

statement.bind(1231,"标题","专辑","艺术家","标签")

用QueryBuilder动态生成CQL语句

Insert insert = QueryBuilder

.insertInto("addressbook", "contact") // 空间(数据库)名字,列族名字

.value("firstName", "Dwayne") // 列名,列的值

.value("lastName", "Garcia")

.value("email", "dwayne@example.com";

ResultSet results = session.execute(insert);

下面的例子使用的 jar 为 cassandra-driver-core 

给出 jar 的maven dependency

<!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.1.9</version>
</dependency>

下面分别给出三个例子 展示如何利用Java代码操作Cassandra 操作数据库。

1.  使用SQL操作 Cassandra 数据库

2.  使用cassandra-driver-core 封装的 QueryBuilder 操作数据库

3.  使用Preparement SQL 操作 Cassandra数据库

1.使用SQL操作 Cassandra 数据库

package cassandra;

import java.util.List;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

/**
* Hello world!
*
*/
public class Cassandra_CQL
{
public static void main( String[] args )
{
Cluster cluster = null;
Session session = null;

try {

//定义一个Cluster类
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();

//需要获取Session对象
session = cluster.connect();

//创建键空间
String createKeySpaceCQL = "create keyspace if not exists keyspace1 with replication={'class':'SimpleStrategy', 'replication_factor': 1}";
session.execute(createKeySpaceCQL);

//创建列族
String createTableCQL = "create table if not exists keyspace1.student(name varchar primary key, age int)";
session.execute(createTableCQL);

//插入数据
String insertCQL = "insert into keyspace1.student(name,age) values('zhangsan',20)";
session.execute(insertCQL);

//查询,修改,删除数据
String queryCQL = "select * from keyspace1.student";
ResultSet rs = session.execute(queryCQL);
List<Row> dataList = rs.all();
for (Row row : dataList) {
System.out.println("==>name: "+ row.getString("name"));
System.out.println("==>age: "+row.getInt("age"));
}

//修改
String updateCQL = "update keyspace1.student set age=22 where name='zhangsan'";
session.execute(updateCQL);
rs = session.execute(queryCQL);
dataList = rs.all();
for (Row row : dataList) {
System.out.println("==>name: "+ row.getString("name"));
System.out.println("==>age: "+row.getInt("age"));
}

//删除数据
String deleteCQL = "delete from keyspace1.student where name='zhangsan'";
session.execute(deleteCQL);
rs = session.execute(queryCQL);
dataList = rs.all();
for (Row row : dataList) {
System.out.println("==>name: "+ row.getString("name"));
System.out.println("==>age: "+row.getInt("age"));
}

} catch (Exception e) {

e.printStackTrace();
}finally{

//关闭Session和Cluster
session.close();
cluster.close();
}

}
}


2.使用cassandra-driver-core 封装的 QueryBuilder 操作数据库

package cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select.Where;

public class Cassandra_builder {

public static void main( String[] args )
{
Cluster cluster = null;
Session session = null;

try {

//定义一个Cluster类
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();

//需要获取Session对象
session = cluster.connect();

//创建键空间
String createKeySpaceCQL = "create keyspace if not exists keyspace1 with replication={'class':'SimpleStrategy', 'replication_factor': 1}";
session.execute(createKeySpaceCQL);

//创建列族
String createTableCQL = "create table if not exists keyspace1.student(name varchar primary key, age int)";
session.execute(createTableCQL);

//新增数据
Insert insert = QueryBuilder.insertInto("keyspace1", "student").value("name", "lisi").value("age", 11);
session.execute(insert);
System.out.println("插入语句: "+insert);

System.out.println("查询数据");
//查询数据
Where select = QueryBuilder.select().all().from("keyspace1", "student").where(QueryBuilder.eq("name", "lisi"));
ResultSet rs = session.execute(select);
for(Row row : rs.all()){
System.out.println("=>name: "+row.getString("name"));
System.out.println("=>age: "+row.getInt("age"));
}
System.out.println("查询语句: "+select);

System.out.println("修改数据");
//修改数据
com.datastax.driver.core.querybuilder.Update.Where update = QueryBuilder.update("keyspace1", "student").with(QueryBuilder.set("age", 21)).where(QueryBuilder.eq("name", "lisi"));
session.execute(update);
rs = session.execute(select);
for(Row row : rs.all()){
System.out.println("=>name: "+row.getString("name"));
System.out.println("=>age: "+row.getInt("age"));
}
System.out.println("修改语句: "+update);

System.out.println("删除数据");
//删除数据
com.datastax.driver.core.querybuilder.Delete.Where delete = QueryBuilder.delete().from("keyspace1","student").where(QueryBuilder.eq("name", "lisi"));
session.execute(delete);
rs = session.execute(select);
for(Row row : rs.all()){
System.out.println("=>name: "+row.getString("name"));
System.out.println("=>age: "+row.getInt("age"));
}
System.out.println("删除语句: "+delete);

} catch (Exception e) {

e.printStackTrace();
}finally{

//关闭Session和Cluster
session.close();
cluster.close();
}

}

}


3. 使用Preparement SQL 操作 Cassandra数据库 

package cassandra;

import java.util.List;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class Cassandra_preCQL {

public static void main(String[] args) {
Cluster cluster = null;
Session session = null;

try {

//定义一个Cluster类
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();

//需要获取Session对象
session = cluster.connect();

//创建键空间
String createKeySpaceCQL = "create keyspace if not exists keyspace1 with replication={'class':'SimpleStrategy', 'replication_factor': 1}";
session.execute(createKeySpaceCQL);

//创建列族
String createTableCQL = "create table if not exists keyspace1.student(name varchar primary key, age int)";
session.execute(createTableCQL);

//插入数据
PreparedStatement statement = session.prepare("insert into keyspace1.student(name,age) values(?,?)");
session.execute(statement.bind("zhangsan",40));

//查询,修改,删除数据
String queryCQL = "select * from keyspace1.student";
ResultSet rs = session.execute(queryCQL);
List<Row> dataList = rs.all();
for (Row row : dataList) {
System.out.println("==>name: "+ row.getString("name"));
System.out.println("==>age: "+row.getInt("age"));
}

} catch (Exception e) {

e.printStackTrace();
}finally{

//关闭Session和Cluster
session.close();
cluster.close();
}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: