实现一个通用的计数器组件
2017-02-28 00:00
441 查看
如今的因特网上有着各种各样的资源和服务,而常见的互联网应用,无论是电商、社交,还是内容网站,都有一个离不开的功能,那就是资源计数,浏览量,点赞量,评论量,收藏量,销量,等等。每种计数都是由不同的动作触发,都需要进行数据库对应字段+1的操作,但是对于大型分布式应用,这种单一的操作代码和众多模块中的业务代码如果相耦合,无论是开发还是调试都会成为痛点。而且对于用户基数庞大的互联网应用,如果仅仅单纯的使用数据库update的方式,那像浏览量这种高频次的用户行为,数据库难免会因过载而产生瓶颈。
根据这种需求,实现了基于REDIS + ACTIVEMQ的资源计数组件。
1.先定义统计量类型
justInTime表示该类型的消息是否需要即时持久化到数据库,对于评论量,销量等频次较低,并发较小的用户行为,就可以直接使用update更新数据库,但是对于浏览量等高并发行为,就不能即时更新数据库,而是读取和写入都使用REDIS缓存,然后使用定时任务同步redis的数据到数据库中。
2.定义统计量对象
3.通用的Counter服务
4.从消息队列中接受消息并处理counter任务
5.最后再写定时任务,同步redis中的数据到数据库中
这样就实现了一个通用的资源计数组件,比如要增加商品销量就可以这样写
根据这种需求,实现了基于REDIS + ACTIVEMQ的资源计数组件。
1.先定义统计量类型
/** * Created by wuqiang on 2016/11/27. * 计数类型,浏览量,收藏量,售量,评论量,点赞量等等 */ public enum CountType { view_count(false), favorite_count(true), sales_count(true), comment_count(true), upvote_count(false); //统计量是否即时同步到数据库,即时数据使用mq同步,非即时数据使用定时任务同步 private boolean justInTime; private CountType(boolean justInTime){ this.justInTime = justInTime; } public boolean isJustInTime() { return justInTime; } }
justInTime表示该类型的消息是否需要即时持久化到数据库,对于评论量,销量等频次较低,并发较小的用户行为,就可以直接使用update更新数据库,但是对于浏览量等高并发行为,就不能即时更新数据库,而是读取和写入都使用REDIS缓存,然后使用定时任务同步redis的数据到数据库中。
2.定义统计量对象
public class CountIncrementModel implements Serializable { public CountIncrementModel( CountType countType, int countAction, String entityType, Long entityId) { this.countType = countType; this.countAction = countAction; this.entityType = entityType; this.entityId = entityId; } private CountType countType;//统计量类型 private int countAction;//计数动作 1:+ -1:- private String entityType;//资源类型,用权限定类名表示 private Long entityId;//资源ID public String getEntityType() { return entityType; } public void setEntityType(String entityType) { this.entityType = entityType; } public Long getEntityId() { return entityId; } public void setEntityId(Long entityId) { this.entityId = entityId; } public CountType getCountType() { return countType; } public void setCountType(CountType countType) { this.countType = countType; } public int getCountAction() { return countAction; } public void setCountAction(int countAction) { this.countAction = countAction; } }
3.通用的Counter服务
@Component public class Counter { @Autowired private Notifier notifier; /** * 统计量递增,无论哪种统计量都先处理缓存,然后根据是否及时持久化,决定采用异步还是定时任务 * @param countType 统计量类型 * @param entityType 资源实体类型 * @param entityId 资源ID * @throws Exception */ public void increment(CountType countType, String entityType, Long entityId) throws Exception{ if(ValidationUtil.isEmpty(countType) || ValidationUtil.isEmpty(entityType) || ValidationUtil.isEmpty(entityId)) return; Class entityClass = null; try{entityClass = Class.forName(entityType);}catch (ClassNotFoundException e){} if(entityClass != null){ Table table = (Table)entityClass.getAnnotation(Table.class); if(table != null){ Redis.increment(1L,countType.name(),table.name(),entityId.toString()); if(countType.isJustInTime()){ notifier.notify( new CountIncrement(new CountIncrementModel(countType,1,table.name(),entityId)), CountIncrement.class.getSimpleName() ); } } } } /** * 统计量递减,无论哪种统计量都先处理缓存,然后根据是否及时持久化,决定采用异步还是定时任务 * @param countType 统计量类型 * @param entityType 资源实体类型 * @param entityId 资源ID * @throws Exception */ public void decrement(CountType countType,String entityType,Long entityId) throws Exception{ if(ValidationUtil.isEmpty(countType) || ValidationUtil.isEmpty(entityType) || ValidationUtil.isEmpty(entityId)) return; Class entityClass = null; try{entityClass = Class.forName(entityType);}catch (ClassNotFoundException e){} if(entityClass != null){ Table table = (Table)entityClass.getAnnotation(Table.class); if(table != null){ Redis.increment(-1L,countType.name(),table.name(),entityId.toString()); if(countType.isJustInTime()){ notifier.notify(new CountIncrement(new CountIncrementModel(countType,-1,table.name(),entityId)), CountIncrement.class.getSimpleName() ); } } } } } public class Notifier { @Autowired private JmsTemplate jmsTemplate; /** * TODO.给指定的队列发送JMS通知 * @param notification * @param destinationName * @throws Exception */ public void notify(Notification notification,String destinationName) throws Exception{ if(ValidationUtil.isEmpty(notification) || ValidationUtil.isEmpty(destinationName)) return; notification.id(sequenceGenerator.getNextValue()); notification.createTime(new Date()); Destination destination = notification.isTopic() ? new ActiveMQTopic(destinationName) : new ActiveMQQueue(destinationName); this.executeSend(notification, destination); } private void executeSend(final Serializable message, Destination destination) { this.jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(message); return objectMessage; } }); } } public interface Notification extends Serializable{ Long id(); void id(Long id); Date createTime(); void createTime(Date createTime); Object body(); boolean isTopic(); boolean needPersist(); int retryTimes(); } public abstract class AbstractNotification implements Notification{ private static final long serialVersionUID = 1L; protected Long id; protected Date createTime; protected Object body; public AbstractNotification(Object body) { this.body = body; } @Override public Long id() { return this.id; } @Override public Date createTime() { return this.createTime; } @Override public Object body() { return this.body; } @Override public void id(Long id){ this.id = id; } @Override public void createTime(Date createTime){ this.createTime = createTime; } } public class CountIncrement extends AbstractNotification{ public CountIncrement(CountIncrementModel countIncrementModel){ super(countIncrementModel); } @Override public boolean isTopic() { return false; } @Override public boolean needPersist() { return false; } @Override public int retryTimes() { return 1; } }
4.从消息队列中接受消息并处理counter任务
public class CountIncrementHandler extends JMSMessageHandler { private static final String COUNT_INCREMENT_LOCK_KEY = "count_increment_lock_key"; @Override protected void handleMessage(Notification notification) throws Exception { CountIncrementModel countIncrementModel = (CountIncrementModel)notification.body(); if(!ValidationUtil.isEmpty(countIncrementModel)){ final CountType countType = countIncrementModel.getCountType(); final int countAction = countIncrementModel.getCountAction(); final String entityType = countIncrementModel.getEntityType(); final Long entityId = countIncrementModel.getEntityId(); String key = COUNT_INCREMENT_LOCK_KEY + countType.name() + entityType + entityId.toString(); //使用锁机制保证多线程更新数据的安全 ConcurrentUtil.runWithLock(key, new Callable<String>() { @Override public String call() throws Exception { Map<String,Object> params = new HashMap<String,Object>(); params.put("id",entityId); DB.update("update " + entityType + " set " + countType.name() + " = " + countType.name() + (countAction > 0 ? "+" : "-") +" 1 where id=:id",params); return null; } } ); } } } public class ConcurrentUtil { /** * TODO.以全局分布式锁的方式执行操作 * @param entityKey 全局锁的Key * @param callee * @return * @throws InterruptedException * @throws Exception */ public static <T> T runWithLock(String entityKey,Callable<T> callee) throws InterruptedException, Exception{ lock(entityKey); T result = null; try { result = callee.call(); } finally{ unlock(entityKey); } return result; } /** * TODO.获取全局锁 * @param entityKey 全局锁的Key * @throws InterruptedException * @throws Exception */ private static void lock(String entityKey) throws InterruptedException, Exception { while (true) { if (Redis.setNXWithExpire(entityKey, "monitor", CoreConstants.OPERATION_TOKEN_EXPIRE)) { return; } else { Thread.sleep(10); } } } /** * TODO.释放全局锁 * @param entityKey 全局锁的Key * @throws Exception */ private static void unlock(String entityKey) throws Exception{ Redis.remove(entityKey); } }
5.最后再写定时任务,同步redis中的数据到数据库中
这样就实现了一个通用的资源计数组件,比如要增加商品销量就可以这样写
counter.increment(CountType.sales_count,Product.class.getName(),productId);
相关文章推荐
- 通用权限管理系统组件 (GPM - General Permissions Manager) 中后一个登录的把前一个登录的踢掉功能的实现
- 通用权限管理系统组件 中后一个登录的把前一个登录的踢掉功能的实现
- 设计并实现用于ASP.NET一个通用的数据存取层应用程序 原作者 Paul Abarham 翻译 cwxiao888@163.com
- 有关打印、收藏等的JS代码(打印等主要使用了一个IE组件来实现)
- 用Remoting 实现一个文件传输组件
- ACE实现了一个通用的架构通信程序
- 写一个通用数据访问组件
- 我最近写了一个通用的ASP实现文件缓存的小CLASS。
- 一个通用的调用shell命令及打印退出状态的C语言实现
- 写一个通用数据访问组件
- 一个通用的DAO接口及其Hibernate3实现
- 用VB写的一个组件,实现添加系统用户,并添加到指定组
- 有关打印、收藏等的JS代码(打印等主要使用了一个IE组件来实现)
- 实现图形JSF组件很简单地构建一个纯HTML无法轻松实现的图形Web应用程序组件
- 一个通用的DAO接口及其Hibernate3实现(转)
- 写一个通用数据访问组件
- 写一个CopyOnWrite的通用实现(C++)
- 实现图形JSF组件很简单地构建一个纯HTML无法轻松实现的图形Web应用程序组件
- 采用C++的ACE库实现的一个通用的C/S架构通信程序
- 设计、实现一个 Asp.Net 应用的通用数据存取层(二)