您的位置:首页 > 数据库 > Redis

用redis实现任务调度

2016-12-01 19:24 120 查看

--用redis实现任务调度--

[align=left] 一.因业务需求,最近写了个手机端的直播地址分配,由于服务器带宽有限,所以要控制最大在线直播人数,为了解决这个问题用了redis实现直播地址分配,直播有另外直播服务器处理, 该服务器只分配直播地址[/align]
[align=left]
[/align]
[align=left]1.点击直播,先进入直播任务队列中,返回一个定时器[/align]

public TimedTask toLiveTask(LiveInfo liveInfo) {
Jedis jedis = JedisUtils.getResource();
jedis.lpush(Constants.WAIT_LIVE_TASK_QUEUE, GsonUtils.toJson(liveInfo));
Timer timer = new Timer(true);
boolean flag = true;
TimedTask timedTask = new TimedTask(flag,liveInfo);
timer.schedule(timedTask, Global.WAIT_TIME);
jedis.close();
return timedTask;

}

[align=left]
[/align][align=left]2.直播进行中,如果当前在线直播人数小于最大直播数则分配直播地址,否则等待中,[/align]
@Transactional(readOnly = false)
public String liveOnTask(LiveInfo liveInfo,TimedTask timedTask) {

Jedis jedis = JedisUtils.getResource();
boolean flag = true;
Timer t = new Timer(true);
timedTask = new TimedTask(flag,liveInfo);
t.schedule(timedTask, Global.WAIT_TIME);//定时器开始

while(flag && timedTask.flag) {//任务还在
synchronized (this) {//并发处理
//判断当前直播总数是否小于最大直播总数,如果小于则分配直播地址,大于则继续等待
if(Integer.valueOf(jedis.get(Constants.ON_LIVE_MAX_SUM))<Global.MAX_ON_LIVE_SUM) {//判断当前直播总数

liveInfo.setBeginTime(new Date());//直播开始时间
liveInfo.setLiveUrl(IdGen.uuid());//分发直播地址

Transaction tx = jedis.multi();//redis事务

liveInfoDao.insert(liveInfo);

tx.sadd(Constants.ON_LIVE_TASK_QUEUE, GsonUtils.toJson(liveInfo));//加入直播任务queue
tx.incr(Constants.ON_LIVE_MAX_SUM);//直播人数增加
tx.exec();
jedis.close();
//System.out.println("返回直播地址:"+liveInfo.getLiveUrl());
flag = false;
timedTask.cancel();//直播地址分配成功,定时任务取消
return liveInfo.getLiveUrl();

}else {
flag = timedTask.flag;//继续等待标识
//System.out.println("等待中,继续监控任务");
}
}
}

jedis.close();
return null;

}

[align=left]3.退出直播 等待超时退出直播,和直播结束[/align]
@Transactional(readOnly = false)
public void liveOffTask(LiveInfo liveInfo){
Jedis jedis = JedisUtils.getResource();

if (liveInfo == null) return;
if(liveInfo.getLiveUrl().equals("") || liveInfo.getLiveUrl()==null){
jedis.lrem(Constants.WAIT_LIVE_TASK_QUEUE, 1, GsonUtils.toJson(liveInfo));//取消任务
jedis.close();
return ;
}
Transaction tx = jedis.multi();
liveInfo.setEndTime(new Date());
liveInfoDao.updateByPrimaryKey(liveInfo);//同步到数据库

tx.decr(Constants.ON_LIVE_MAX_SUM);//直播总数递减
tx.srem(Constants.ON_LIVE_TASK_QUEUE, GsonUtils.toJson(liveInfo));//清除直播任务
tx.exec();
jedis.close();
}

[align=left]
4.定时器
[/align]
public class TimedTask extends TimerTask{

public boolean flag;
private LiveInfo liveInfo;
public TimedTask(boolean flag,LiveInfo liveInfo) {
this.flag = flag;
this.liveInfo = liveInfo;
}
@Override
public void run() {
Jedis jedis = JedisUtils.getResource();
jedis.lrem(Constants.WAIT_LIVE_TASK_QUEUE, 1, GsonUtils.toJson(liveInfo));//等待任务队列
jedis.close();
flag = false;//取消等待标记
//System.out.println("等待超时了,任务结束"+ "flag="+flag);
}

}

[align=left]5.controller层 直播[/align]
@ResponseBody
@RequestMapping(value = "/getLiveUrl", method = RequestMethod.POST)
public String getLiveAddr(@RequestParam(value = "userId", required = false) String userId,
@RequestParam(value = "title", required = false) String title,
@RequestParam(value = "summary", requir
4000
ed = false) String summary,
@RequestParam(value = "location", required = false) String location){

LiveInfo liveInfo = new LiveInfo();
liveInfo.setUserId(userId);
liveInfo.setTitle(title);
liveInfo.setSummary(summary);
liveInfo.setLocation(location);
liveInfo.setBeginTime(new Date());
liveInfo.setLiveUrl(IdGen.uuid());

return liveTaskService.liveOnTask(liveInfo, liveTaskService.toLiveTask(liveInfo));

}

[align=left]6. 退出直播[/align]
@ResponseBody
@RequestMapping(value = "/liveOut", method = RequestMethod.GET)
public void liveOut(@RequestParam(value = "liveUrl", required = false) String liveUrl) {

LiveInfo liveInfo = liveInfoService.getLiveInfoByLiveUrl(liveUrl);
if(liveInfo==null) {
throw new PhoneException(HttpStatus.BAD_REQUEST, new Error(400, "直播地址错误!"));
}
liveTaskService.liveOffTask(liveInfo);

}


7.初始化最大直播总数省略(项目启动初始化redis的最大直播总数)

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: