您的位置:首页 > 理论基础 > 计算机网络

httpclient4.5+disruptor3.3.2+mongodb3.2+jsoup写的一个抓取小程序

2016-01-14 17:54 525 查看
最近由于工作上接手了一个storm的系统,storm采用dubbo接口接收数据,存队列,这里队列用的是jdk自带的阻塞队列ArrayBlockQueue,再做了几轮压测之后发现ArrayBlockQueue队列存在性能瓶颈。经老大介绍可以尝试disruptor;

disruptor是一个开源的无锁队列,性能非常强悍,具体强悍到什么地步大家可以自测一下;

但disruptor还是不适用于我们的系统场景,因为在我们的storm架构里是主动去队列里拿消息消费,而disruptor是被动执行,简单的说就是队列对每一个存在里面的消息都会有一个监听,即被动消费消息;总之后来没用成;

还是说正题吧,后来就自己写一个小的抓取程序去研究一个这个性能强大的无锁队列;

稍微思考了一下,抓取肯定需要httpclient,还有网页解析就用jsoup吧,存贮就用mongodb吧;

这才发现httpclient和mongodb都更新了,httpclient不多说,想说的是mongodb,上次用mongo还是2.0时代,现在已经更新到3.2了。

直接上代码了

这里就上MongoDB3.2的代码了,主要是封装了一个基本的工具类:

package com.jiangjun.crawler.mongodb;

import com.jiangjun.crawler.Constant;
import com.mongodb.*;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

import java.util.List;
import java.util.Map;

/**
* Created by 15061760 on 2015/12/28 0028.
*/
public abstract class AbstractMongoDBCache {

public static MongoClient mongo = null;
public static MongoDatabase db = null;

static {
MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
builder.connectionsPerHost(50);//最大链接数
builder.threadsAllowedToBlockForConnectionMultiplier(50);//每个connection最多可以有多少个线程等待
builder.maxWaitTime(1000 * 60 * 2);
builder.connectTimeout(1000 * 60 * 1);
MongoClientOptions options = builder.build();
ServerAddress serverAddress = new ServerAddress("127.0.0.1", 27017);
mongo = new MongoClient(serverAddress, options);
db = mongo.getDatabase(Constant.MONGO_DB);
}

public static void destory() {
if (mongo != null) {
mongo.close();
db = null;
}
}

/**
* 根据集合名称查询集合
*
* @param name
* @return
*/
public abstract MongoCollection getCollectionByName(String name);

/**
* 根据集合名称和条件查询记录
* @param paramMap
* @param name
* @return
*/
public abstract MongoCursor findOneByParam(Map<String,Object> paramMap,String name);

/**
*
4000
根据name查询集合的所有记录
*
* @param name
* @return
*/
public abstract MongoCursor queryCollectionByName(String name);

/**
* 保存数据
*
* @param document
* @param collectionName
*/
public abstract void save(Document document, String collectionName);

/**
* 批量保存
*
* @param objectList
* @param collectionName
*/
public abstract void saveBatch(List<Document> objectList, String collectionName);

/**
* 从集合中删除一条记录
*
* @param paramMap
* @param collectionName
*/
public abstract void removeByParam(Map<String,Object> paramMap, String collectionName);

/**
* 删除一个集合
*
* @param collectionName
*/
public abstract void dropCollection(String collectionName);

}
package com.jiangjun.crawler.mongodb;

import com.jiangjun.crawler.Constant;
import com.jiangjun.crawler.bean.NetEaseDaDa;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import org.bson.BSON;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.util.*;
import java.util.logging.Filter;

/**
* Created by 15061760 on 2015/12/28 0028.
*/
public class MongoDBSupport extends AbstractMongoDBCache {

private static MongoDBSupport mongoDBSupport = new MongoDBSupport();

private MongoDBSupport() {
}

public static MongoDBSupport getInstance() {
return mongoDBSupport;
}

@Override
public MongoCollection getCollectionByName(String name) {
MongoCollection collection = db.getCollection(name);
return collection;
}

@Override
public MongoCursor queryCollectionByName(String name) {
MongoCollection collection = db.getCollection(name);
return collection.find().iterator();
}

@Override
public void save(Document document, String collectionName) {
MongoCollection collection = db.getCollection(collectionName);
collection.insertOne(document);
}

@Override
public void saveBatch(List<Document> objectList, String collectionName) {
MongoCollection collection = db.getCollection(collectionName);
collection.insertMany(objectList);
}

@Override
public void dropCollection(String collectionName) {
MongoCollection collection = db.getCollection(collectionName);
collection.drop();
}

@Override
public void removeByParam(Map<String, Object> paramMap, String collectionName) {
MongoCollection collection = db.getCollection(collectionName);
final List<Bson> bsons = new ArrayList<Bson>();
for (Map.Entry<String, Object> m : paramMap.entrySet()) {
Bson b = Filters.eq(m.getKey(), (String) m.getValue());
bsons.add(b);
}
Iterable iterable = new Iterable() {
public Iterator iterator() {
return bsons.iterator();
}
};
collection.deleteOne(Filters.and(iterable));
}

@Override
public MongoCursor findOneByParam(Map<String, Object> paramMap, String name) {
MongoCollection collection = db.getCollection(name);
final List<Bson> bsons = new ArrayList<Bson>();
for (Map.Entry<String, Object> m : paramMap.entrySet()) {
Bson b = Filters.eq(m.getKey(), (String) m.getValue());
bsons.add(b);
}
Iterable iterable = new Iterable() {
public Iterator iterator() {
return bsons.iterator();
}
};
return collection.find(Filters.and(iterable)).iterator();
}

public static void main(String[] args) {
MongoDBSupport mongoDBSupport = MongoDBSupport.getInstance();
MongoCursor cursor = mongoDBSupport.findOneByParam(new HashMap<String, Object>() {
{
put("link", "http://d.news.163.com/article/BD9L573200014TUH");
put("url", "http://d.news.163.com/articlesPage/new");
}
}, Constant.MONGO_NETEASE);
if(cursor.hasNext()) {
System.out.println(cursor.next().toString());
}
}
}

另外附上disruptor的代码:
package com.jiangjun.crawler.disruptor;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.jiangjun.crawler.bean.UrlEvent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by 15061760 on 2015/12/26 0026.
*/
public class Disrupter4crawler {
static EventFactory<UrlEvent> urlEventFactory = null;
static ExecutorService executorService = null;
final static int ringBufferSize = 2 * 1024;
static Disruptor<UrlEvent> disruptor4Wait = null;
static Disruptor<UrlEvent> disruptor4Complete = null;
static EventHandler<UrlEvent>
cdbc
eventHandler = null;

static {
urlEventFactory = new UrlEventFactory();
executorService = Executors.newCachedThreadPool();
disruptor4Wait = new Disruptor<UrlEvent>(urlEventFactory, ringBufferSize, executorService);
disruptor4Complete = new Disruptor<UrlEvent>(urlEventFactory, ringBufferSize, executorService);
eventHandler = new UrlEventHandler();
disruptor4Wait.handleEventsWith(eventHandler);
disruptor4Complete.handleEventsWith(eventHandler);
disruptor4Wait.start();
disruptor4Complete.start();
}

public static void offer(UrlEvent urlEvent) {
int flag = urlEvent.getFlag();
switch (flag) {
case 0:
RingBuffer<UrlEvent> ringBuffer_wait = disruptor4Wait.getRingBuffer();
long sequenceWait = ringBuffer_wait.next();
try {
UrlEvent ueWait = ringBuffer_wait.get(sequenceWait);
ueWait.setUrl(urlEvent.getUrl());
ueWait.setDes(urlEvent.getDes());
ueWait.setFlag(urlEvent.getFlag());
}finally {
ringBuffer_wait.publish(sequenceWait);
}
break;
case 1:
RingBuffer<UrlEvent> ringBuffer_complete = disruptor4Complete.getRingBuffer();
long sequenceComplete = ringBuffer_complete.next();
try {
UrlEvent ueComplete = ringBuffer_complete.get(sequenceComplete);
ueComplete.setUrl(urlEvent.getUrl());
ueComplete.setDes(urlEvent.getDes());
ueComplete.setFlag(urlEvent.getFlag());
}finally {
ringBuffer_complete.publish(sequenceComplete);
}
break;
default:
break;
}
}
}
package com.jiangjun.crawler.disruptor;

import com.lmax.disruptor.EventFactory;
import com.jiangjun.crawler.bean.UrlEvent;

/**
* Created by 15061760 on 2015/12/26 0026.
*/
public class UrlEventFactory implements EventFactory<UrlEvent>{
public UrlEvent newInstance() {
return new UrlEvent();
}
}

package com.jiangjun.crawler.disruptor;

import com.jiangjun.crawler.filter.SaveFilter;
import com.lmax.disruptor.EventHandler;
import com.jiangjun.crawler.bean.UrlEvent;
import com.jiangjun.crawler.filter.DocumentParseFilter;
import com.jiangjun.crawler.filter.HttpClientFilter;
import org.bson.Document;

/**
* Created by 15061760 on 2015/12/26 0026.
*/
public class UrlEventHandler implements EventHandler<UrlEvent> {
public void onEvent(UrlEvent urlEvent, long sequence, boolean endOfBatch) throws Exception {
if (urlEvent.getFlag() == 0) {
//走抓取流程
HttpClientFilter.getInstance().setFilter(DocumentParseFilter.getInstance());
DocumentParseFilter.getInstance().setFilter(SaveFilter.getInstance());
HttpClientFilter.getInstance().doProcess(urlEvent);
}
}
}


package com.jiangjun.crawler;

import com.jiangjun.crawler.bean.UrlEvent;
import com.jiangjun.crawler.disruptor.Disrupter4crawler;

import java.util.ArrayList;
import java.util.List;

/**
* Created by 15061760 on 2015/12/29 0029.
*/
public class Main {

static List<UrlEvent> urlEvents = new ArrayList<UrlEvent>();

static {
UrlEvent urlEvent = new UrlEvent();
urlEvent.setUrl("http://baike.baidu.com/cms/home/eventsOnHistory/12.json?_=1451271757920");
urlEvent.setDes("百度百科");
urlEvent.setFlag(0);

UrlEvent urlEvent2 = new UrlEvent();
urlEvent2.setUrl("http://d.news.163.com/articlesPage/new");
urlEvent2.setDes("网易哒哒-新");
urlEvent2.setFlag(0);

urlEvents.add(urlEvent);
urlEvents.add(urlEvent2);
}

public static void main(String[] args) {
System.out.println("crawler");
//        UrlEvent urlEvent = new UrlEvent();
////        urlEvent.setUrl("http://baike.baidu.com/cms/home/eventsOnHistory/12.json?_=1451271757920");
////        urlEvent.setDes("百度百科");
//        urlEvent.setUrl("http://d.news.163.com/articlesPage/new");
//        urlEvent.setDes("网易哒哒-新");
//        urlEvent.setFlag(0);
for (UrlEvent u : urlEvents) {
Disrupter4crawler.offer(u);
}
}
}
 
代码也没用心推敲,主要是mongoDB的API弃用了2.0时候的方法,看了3.2的API写了个小工具类,还有待完善和扩充。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mongodb disruptor