您的位置:首页 > 其它

实现一个通用的计数器组件

2017-02-28 00:00 441 查看
如今的因特网上有着各种各样的资源和服务,而常见的互联网应用,无论是电商、社交,还是内容网站,都有一个离不开的功能,那就是资源计数,浏览量,点赞量,评论量,收藏量,销量,等等。每种计数都是由不同的动作触发,都需要进行数据库对应字段+1的操作,但是对于大型分布式应用,这种单一的操作代码和众多模块中的业务代码如果相耦合,无论是开发还是调试都会成为痛点。而且对于用户基数庞大的互联网应用,如果仅仅单纯的使用数据库update的方式,那像浏览量这种高频次的用户行为,数据库难免会因过载而产生瓶颈。

根据这种需求,实现了基于REDIS + ACTIVEMQ的资源计数组件。

1.先定义统计量类型

/**
* Created by wuqiang on 2016/11/27.
* 计数类型,浏览量,收藏量,售量,评论量,点赞量等等
*/
public enum CountType {
view_count(false),
favorite_count(true),
sales_count(true),
comment_count(true),
upvote_count(false);

//统计量是否即时同步到数据库,即时数据使用mq同步,非即时数据使用定时任务同步
private boolean justInTime;

private CountType(boolean justInTime){
this.justInTime = justInTime;
}

public boolean isJustInTime() {
return justInTime;
}
}

justInTime表示该类型的消息是否需要即时持久化到数据库,对于评论量,销量等频次较低,并发较小的用户行为,就可以直接使用update更新数据库,但是对于浏览量等高并发行为,就不能即时更新数据库,而是读取和写入都使用REDIS缓存,然后使用定时任务同步redis的数据到数据库中。

2.定义统计量对象

public class CountIncrementModel implements Serializable {

public CountIncrementModel(
CountType countType, int countAction, String entityType, Long entityId) {
this.countType = countType;
this.countAction = countAction;
this.entityType = entityType;
this.entityId = entityId;
}

private CountType countType;//统计量类型
private int countAction;//计数动作 1:+ -1:-
private String entityType;//资源类型,用权限定类名表示
private Long entityId;//资源ID

public String getEntityType() {
return entityType;
}

public void setEntityType(String entityType) {
this.entityType = entityType;
}

public Long getEntityId() {
return entityId;
}

public void setEntityId(Long entityId) {
this.entityId = entityId;
}

public CountType getCountType() {
return countType;
}

public void setCountType(CountType countType) {
this.countType = countType;
}

public int getCountAction() {
return countAction;
}

public void setCountAction(int countAction) {
this.countAction = countAction;
}
}

3.通用的Counter服务

@Component
public class Counter {

@Autowired
private Notifier notifier;

/**
* 统计量递增,无论哪种统计量都先处理缓存,然后根据是否及时持久化,决定采用异步还是定时任务
* @param countType 统计量类型
* @param entityType 资源实体类型
* @param entityId 资源ID
* @throws Exception
*/
public void increment(CountType countType, String entityType, Long entityId) throws Exception{
if(ValidationUtil.isEmpty(countType)
|| ValidationUtil.isEmpty(entityType)
|| ValidationUtil.isEmpty(entityId))
return;
Class entityClass = null;
try{entityClass = Class.forName(entityType);}catch (ClassNotFoundException e){}
if(entityClass != null){
Table table = (Table)entityClass.getAnnotation(Table.class);
if(table != null){
Redis.increment(1L,countType.name(),table.name(),entityId.toString());
if(countType.isJustInTime()){
notifier.notify(
new CountIncrement(new CountIncrementModel(countType,1,table.name(),entityId)),
CountIncrement.class.getSimpleName()
);
}
}
}
}

/**
* 统计量递减,无论哪种统计量都先处理缓存,然后根据是否及时持久化,决定采用异步还是定时任务
* @param countType 统计量类型
* @param entityType 资源实体类型
* @param entityId 资源ID
* @throws Exception
*/
public void decrement(CountType countType,String entityType,Long entityId) throws Exception{
if(ValidationUtil.isEmpty(countType)
|| ValidationUtil.isEmpty(entityType)
|| ValidationUtil.isEmpty(entityId))
return;
Class entityClass = null;
try{entityClass = Class.forName(entityType);}catch (ClassNotFoundException e){}
if(entityClass != null){
Table table = (Table)entityClass.getAnnotation(Table.class);
if(table != null){
Redis.increment(-1L,countType.name(),table.name(),entityId.toString());
if(countType.isJustInTime()){
notifier.notify(new CountIncrement(new CountIncrementModel(countType,-1,table.name(),entityId)),
CountIncrement.class.getSimpleName()
);
}
}
}
}
}

public class Notifier {

@Autowired
private JmsTemplate jmsTemplate;

/**
* TODO.给指定的队列发送JMS通知
* @param notification
* @param destinationName
* @throws Exception
*/
public void notify(Notification notification,String destinationName) throws Exception{
if(ValidationUtil.isEmpty(notification) || ValidationUtil.isEmpty(destinationName)) return;
notification.id(sequenceGenerator.getNextValue());
notification.createTime(new Date());

Destination destination = notification.isTopic()
? new ActiveMQTopic(destinationName) : new ActiveMQQueue(destinationName);
this.executeSend(notification, destination);
}

private void executeSend(final Serializable message, Destination destination) {
this.jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage(message);
return objectMessage;
}
});
}
}

public interface Notification extends Serializable{

Long id();

void id(Long id);

Date createTime();

void createTime(Date createTime);

Object body();

boolean isTopic();

boolean needPersist();

int retryTimes();
}

public abstract class AbstractNotification implements Notification{

private static final long serialVersionUID = 1L;
protected Long id;
protected Date createTime;
protected Object body;

public AbstractNotification(Object body) {
this.body = body;
}

@Override
public Long id() {
return this.id;
}

@Override
public Date createTime() {
return this.createTime;
}

@Override
public Object body() {
return this.body;
}

@Override
public void id(Long id){
this.id = id;
}

@Override
public void createTime(Date createTime){
this.createTime = createTime;
}
}

public class CountIncrement extends AbstractNotification{

public CountIncrement(CountIncrementModel countIncrementModel){
super(countIncrementModel);
}

@Override
public boolean isTopic() {
return false;
}

@Override
public boolean needPersist() {
return false;
}

@Override
public int retryTimes() {
return 1;
}
}

4.从消息队列中接受消息并处理counter任务

public class CountIncrementHandler extends JMSMessageHandler {

private static final String COUNT_INCREMENT_LOCK_KEY = "count_increment_lock_key";

@Override
protected void handleMessage(Notification notification) throws Exception {
CountIncrementModel countIncrementModel = (CountIncrementModel)notification.body();
if(!ValidationUtil.isEmpty(countIncrementModel)){
final CountType countType = countIncrementModel.getCountType();
final int countAction = countIncrementModel.getCountAction();
final String entityType = countIncrementModel.getEntityType();
final Long entityId = countIncrementModel.getEntityId();
String key = COUNT_INCREMENT_LOCK_KEY + countType.name() + entityType + entityId.toString();
//使用锁机制保证多线程更新数据的安全
ConcurrentUtil.runWithLock(key,
new Callable<String>() {
@Override
public String call() throws Exception {
Map<String,Object> params = new HashMap<String,Object>();
params.put("id",entityId);
DB.update("update " + entityType + " set "
+ countType.name() + " = " + countType.name() + (countAction > 0 ? "+" : "-") +" 1 where id=:id",params);
return null;
}
}
);
}
}
}

public class ConcurrentUtil
{
/**
* TODO.以全局分布式锁的方式执行操作
* @param entityKey    全局锁的Key
* @param callee
* @return
* @throws InterruptedException
* @throws Exception
*/
public static <T> T runWithLock(String entityKey,Callable<T> callee)
throws InterruptedException, Exception{
lock(entityKey);
T result = null;
try {
result = callee.call();
} finally{
unlock(entityKey);
}
return result;
}
/**
* TODO.获取全局锁
* @param entityKey    全局锁的Key
* @throws InterruptedException
* @throws Exception
*/
private static void lock(String entityKey) throws InterruptedException, Exception {
while (true) {
if (Redis.setNXWithExpire(entityKey, "monitor", CoreConstants.OPERATION_TOKEN_EXPIRE)) {
return;
} else {
Thread.sleep(10);
}
}
}
/**
* TODO.释放全局锁
* @param entityKey    全局锁的Key
* @throws Exception
*/
private static void unlock(String entityKey) throws Exception{
Redis.remove(entityKey);
}
}

5.最后再写定时任务,同步redis中的数据到数据库中

这样就实现了一个通用的资源计数组件,比如要增加商品销量就可以这样写

counter.increment(CountType.sales_count,Product.class.getName(),productId);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: