您的位置:首页 > 理论基础 > 计算机网络

SolrJ 操作HttpSolrServer, ConcurrentUpdateSolrServer和CloudSolrServer

2015-08-25 19:21 483 查看
问题导读:

1、如何实现SolrJ线程池连接数据库?

2、如何实现SolrJ的CRUD操作?



HttpSolrServer 使用了Apache
Commons HTTP客户端来连接Solr. 注意在Solr 4.x中,CommonsHttpSolrServer已经改变为HttpSolrServer以及StreamingUpdateSolrServer已经改变为ConcurrentUpdateSolrServerConcurrentUpdateSolrServer更适合update
操作,而HttpSolrServer 更适合query操作。

添加document或是修改document。假如这个document已经存在,就会update这个document。代码片段如下:

public void indexDocs() throws IOException, SolrServerException {

server.setParser(new XMLResponseParser());

//Adds the docs and commit them.

Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();

/* i is used as identification of a document, which is treated as unique key.*/

SolrInputDocument doc2 ;

/*一千条数据,花费大约一小时,生产660M。使用多线程并发执行估计更好的*/

for(int i =10000000; i < 10000002; i++){

doc2 = new SolrInputDocument();

doc2.addField("customer_id", i);

doc2.addField("name", "John Natch-" + i);

doc2.addField("level", "VIP");

doc2.addField("sex", "男");

doc2.addField("address", "【【【【【金刚金刚金刚金刚金刚金】】】】" + i);

System.out.println("add doc "+ i);

docs.add(doc2);

if(docs.size() == 1000){

server.add(docs);

server.commit();

logger.info("commit 1000 doc "+ i);

docs.clear();

}

/*

To immediately commit after adding documents, you could use:

UpdateRequest req = new UpdateRequest();

req.setAction( UpdateRequest.ACTION.COMMIT, false, false );

req.add( docs );

UpdateResponse rsp = req.process( server );

*/

}

server.add(docs);

server.commit();

logger.info("Commits successfully!......");

}

复制代码
能够执行代码前,在Solr core的配置文件shema.xml中配置具体的字段。

<!-- core 'customer' schema field definition -->

<field name="customer_id" type="int" indexed="true" stored="true" required="true" multiValued="false"/>

<field name="name" type="string" indexed="true" stored="true"/>

<field name="sex" type="string" indexed="true" stored="false"/>

<field name="level" type="string" indexed="true" stored="true"/>

<field name="address" type="string" indexed="true" multiValued="true" stored="true"/>

复制代码

<uniqueKey>customer_id</uniqueKey>

复制代码

删除操作:

private void commitDocs(Collection<SolrInputDocument> docs){

try {

//server.deleteById(1) //specify the id list you want to be deleted.

server.add(docs);

server.commit();

docs.clear();

} catch (SolrServerException e) {

logger.error("SolrServerException", e);

} catch (IOException e) {

logger.error("IOException", e) ;

}

}

复制代码
与数据集成,实现使用SolrJ操作数据库。当然,这个可以使用Solr DIH实现。两种各有其优缺点,根据实际的应用来选择具体的实现方式。

public void indexDocsWithDB(){

PoolingDataSourceDemo dataSource = new PoolingDataSourceDemo();

List<List<Object>> rows = dataSource.executeQuerySQL("select * from customer");

String[] columnNames = dataSource.getColNames();

Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();

SolrInputDocument doc ;

for(List row : rows) {

int size = row.size() + 1;

doc = new SolrInputDocument();

for(int i = 1; i < size ; i++){

doc.addField(columnNames[i], row.get(i-1)) ;

logger.info(columnNames[i]+"add filed "+ row.get(i-1)) ;

}

docs.add(doc);

if(docs.size() > 100){

commitDocs(docs);

}

}

if(docs.size() > 0){

commitDocs(docs);

}

}

复制代码
完整的代码:

PoolingDataSourceDemo.java 实现线程池连接数据库。

import net.spy.memcached.compat.log.Logger;

import net.spy.memcached.compat.log.LoggerFactory;

import org.apache.commons.dbcp.*;

import org.apache.commons.pool.impl.GenericObjectPool;

import javax.sql.DataSource;

import java.sql.*;

import java.util.ArrayList;

import java.util.LinkedList;

import java.util.List;

/**

* @author John Liu

* @see

*/

public class PoolingDataSourceDemo {

private final static Logger logger = LoggerFactory.getLogger(PoolingDataSourceDemo.class) ;

/* These properties can be configured in a properties type file*/

private final static String CONNECTION_URL = "jdbc:mysql://localhost/pythondb?autoReconnect=true";

private final static String DRIVER_CLASS = "com.mysql.jdbc.Driver";

private final static String USER_NAME = "elite";

private final static String PASSWORD = "elite";

private final static int MAX_ACTIVE_NUMBER = 10;

private static GenericObjectPool connectionPool = null;

private String[] colNames ;

private static DataSource dataSource;

static {

dataSource = initDataSource();

}

public GenericObjectPool getConnectionPool() {

return connectionPool;

}

public List<List<Object>> executeQuerySQL(String querySQL){

Connection conn = null;

Statement stmt = null;

ResultSet resultSet = null;

List<List<Object>> result = new LinkedList<List<Object>>();

try {

logger.info("Creating connection.");

conn = dataSource.getConnection();

stmt = conn.createStatement();

resultSet = stmt.executeQuery(querySQL);

//show the connection pool status

printDataSourceStats();

logger.info("Results:");

int columnCount = resultSet.getMetaData().getColumnCount();

ResultSetMetaData rsm = resultSet.getMetaData();

colNames = new String[columnCount + 1];

for (int i = 1; i < (columnCount + 1); i++) {

colNames[i] = rsm.getColumnName(i).toLowerCase();

logger.info("column name: "+ colNames[i]) ;

}

List<Object> list ;

while(resultSet.next()) {

list = new ArrayList<Object>() ;

for(int i=1; i<= columnCount; i++) {

Object obj = getColumnValue(rsm, resultSet, colNames, i);

list.add(obj) ;

}

result.add(list);

}

} catch(SQLException e) {

e.printStackTrace();

shutdownDataSource(dataSource);

} finally {

try { if (resultSet != null) resultSet.close(); } catch(Exception e) { }

try { if (stmt != null) stmt.close(); } catch(Exception e) { }

try { if (conn != null) conn.close(); } catch(Exception e) { }

logger.info("result size: "+ result.size());

return result;

}

}

public Object getColumnValue(ResultSetMetaData rsm, ResultSet rs, String[] colNames, int j) throws SQLException {

Object f = null;

if (colNames[j] != null) {

switch (rsm.getColumnType(j)){

case Types.BIGINT:{

f = rs.getLong(j);

break;

}

case Types.INTEGER: {

f = rs.getInt(j);

break;

}

case Types.DATE:{

f = rs.getDate(j);

break;

}

case Types.FLOAT:{

f = rs.getFloat(j);

break;

}

case Types.DOUBLE:{

f = rs.getDouble(j);

break;

}

case Types.TIME: {

f = rs.getDate(j);

break;

}

case Types.BOOLEAN:{

f = rs.getBoolean(j);

break;

}

default:{

f = rs.getString(j);

}

}

}

logger.info("column value: "+ f) ;

return f;

}

/**

* [mysql]

* #hibernate.connection.driver_class com.mysql.jdbc.Driver

#hibernate.connection.url jdbc:mysql:///test

#hibernate.connection.username gavin

#hibernate.connection.password

* @return DataSource

*/

public static DataSource initDataSource(){

//

// Load JDBC Driver class.

//

try {

Class.forName(DRIVER_CLASS).newInstance();

} catch (InstantiationException e) {

logger.error("InstantiationException error", e);

} catch (IllegalAccessException e) {

logger.error("IllegalAccessException error", e);

} catch (ClassNotFoundException e) {

logger.error("ClassNotFoundException error", e);

}

//

// Creates an instance of GenericObjectPool that holds our

// pool of connections object.

//

connectionPool = new GenericObjectPool();

connectionPool.setMaxActive(MAX_ACTIVE_NUMBER);

//

// Creates a connection factory object which will be use by

// the pool to create the connection object. We passes the

// JDBC url info, username and password.

//

ConnectionFactory cf = new DriverManagerConnectionFactory(

CONNECTION_URL,

USER_NAME,

PASSWORD);

//

// Creates a PoolableConnectionFactory that will wraps the

// connection object created by the ConnectionFactory to add

// object pooling functionality.

//

PoolableConnectionFactory pcf =

new PoolableConnectionFactory(cf, connectionPool,

null, null, false, true);

return new PoolingDataSource(connectionPool);

}

public void printDataSourceStats() {

logger.info("Max : " + getConnectionPool().getMaxActive() + "; " +

"Active: " + getConnectionPool().getNumActive() + "; " +

"Idle : " + getConnectionPool().getNumIdle());

}

public void shutdownDataSource(DataSource ds) throws SQLException {

BasicDataSource bds = (BasicDataSource) ds;

bds.close();

}

public String[] getColNames() {

return colNames;

}

public void setColNames(String[] colNames) {

this.colNames = colNames;

}

}

复制代码
SolrIndex.java 实现SolrJ的CRUD操作。

public class SolrIndex {

Logger logger = LoggerFactory.getLogger(SolrIndex.class) ;

/*specified the core customer url*/

private static final String CORE_CUSTOMER_URL= "http://localhost:8088/solr/customer";

private static HttpSolrServer server;

static {

server = new HttpSolrServer(CORE_CUSTOMER_URL);

server.setMaxRetries(1); // defaults to 0. > 1 not recommended.

server.setConnectionTimeout(5000); // 5 seconds to establish TCP

// Setting the XML response parser is only required for cross

// version compatibility and only when one side is 1.4.1 or

// earlier and the other side is 3.1 or later.

server.setParser(new XMLResponseParser()); // binary parser is used by default

// The following settings are provided here for completeness.

// They will not normally be required, and should only be used

// after consulting javadocs to know whether they are truly required.

server.setSoTimeout(1000); // socket read timeout

server.setDefaultMaxConnectionsPerHost(1000);

server.setMaxTotalConnections(1000);

server.setFollowRedirects(false);

// defaults to false

// allowCompression defaults to false.

// Server side must support gzip or deflate for this to have any effect.

server.setAllowCompression(true);

}

/**

* Index a document with specified fields in doc.

* @throws IOException

* @throws SolrServerException

*/

public void indexDocs() throws IOException, SolrServerException {

server.setParser(new XMLResponseParser());

//Adds the docs and commit them.

Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();

/* i is used as identification of a document, which is treated as unique key.*/

SolrInputDocument doc2 ;

/*一千条数据,花费大约一小时,生产660M。使用多线程并发执行估计更好的*/

for(int i =10000000; i < 10000002; i++){

doc2 = new SolrInputDocument();

doc2.addField("customer_id", i);

doc2.addField("name", "John Natch-" + i);

doc2.addField("level", "VIP");

doc2.addField("sex", "男");

doc2.addField("address", "【【【【【金刚金刚金刚金刚金刚金】】】】" + i);

System.out.println("add doc "+ i);

docs.add(doc2);

if(docs.size() == 1000){

server.add(docs);

server.commit();

logger.info("commit 1000 doc "+ i);

docs.clear();

}

/*

To immediately commit after adding documents, you could use:

UpdateRequest req = new UpdateRequest();

req.setAction( UpdateRequest.ACTION.COMMIT, false, false );

req.add( docs );

UpdateResponse rsp = req.process( server );

*/

}

server.add(docs);

server.commit();

logger.info("Commits successfully!......");

}

/**

* solrJ与 database 集成,对数据库中的数据建立索引。当然,这个可以使用Solr DIH取代。

*/

public void indexDocsWithDB(){

PoolingDataSourceDemo dataSource = new PoolingDataSourceDemo();

List<List<Object>> rows = dataSource.executeQuerySQL("select * from customer");

String[] columnNames = dataSource.getColNames();

Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();

SolrInputDocument doc ;

for(List row : rows) {

int size = row.size() + 1;

doc = new SolrInputDocument();

for(int i = 1; i < size ; i++){

doc.addField(columnNames[i], row.get(i-1)) ;

logger.info(columnNames[i]+"add filed "+ row.get(i-1)) ;

}

docs.add(doc);

if(docs.size() > 100){

commitDocs(docs);

}

}

if(docs.size() > 0){

commitDocs(docs);

}

}

private void commitDocs(Collection<SolrInputDocument> docs){

try {

//server.deleteById(1) //specify the id list you want to be deleted.

server.add(docs);

server.commit();

docs.clear();

} catch (SolrServerException e) {

logger.error("SolrServerException", e);

} catch (IOException e) {

logger.error("IOException", e) ;

}

}

/**

* Query documents with specified query value.

* @throws SolrServerException

*/

public void queryDocs() throws SolrServerException {

HttpSolrServer server = new HttpSolrServer(CORE_CUSTOMER_URL );

server.setParser(new XMLResponseParser());

/*query statement settings*/

SolrQuery query = new SolrQuery();

query.setQuery("李玲");

query.setStart(0);

query.setRows(10);

QueryResponse response = server.query( query );

SolrDocumentList documents = response.getResults();

Iterator<SolrDocument> itr = documents.iterator();

logger.info("id \t name");

while (itr.hasNext()) {

SolrDocument doc = itr.next();

logger.info(doc.getFieldValue("customer_id") + ":" + "\t"+doc.

getFieldValue("name"));

}

}

public void delete(){

try {

server.deleteByQuery( "*:*" );

server.commit();

} catch (SolrServerException e) {

logger.error("SolrServerException", e);

} catch (IOException e) {

logger.error("IOException", e);

}

}

public static void main(String[] args){

SolrIndex indexer = new SolrIndex();

long startTime = System.currentTimeMillis();

/*do index with specified documents*/

try {

indexer.indexDocs();

} catch (IOException e) {

e.printStackTrace();

} catch (SolrServerException e) {

e.printStackTrace();

}

// try {

// indexer.queryDocs();

// } catch (SolrServerException e) {

// e.printStackTrace();

// }

/*integration with db. It takes 1214 ms*/

// indexer.delete();

// indexer.indexDocsWithDB();

System.out.println("--------It takes "+ (System.currentTimeMillis() - startTime) + " ms");

}

}

复制代码

另外,SolrJ操作Solr Cloud的机制与HttpSolrServer一样,除了Http的设置使用CloudSolrServer意外。

<span style="color:#006600;">CloudSolrServer server = new CloudSolrServer("localhost:9983");

server.setDefaultCollection("collection1");</span>

SolrInputDocument doc = new SolrInputDocument();

doc.addField( "id", "1234");

doc.addField( "name", "A lovely summer holiday");

server.add(doc);

server.commit();

复制代码

运行代码前,假如下列依赖

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: