您的位置:首页 > 其它

一种高并发流控程序的简单轻量实现

2013-04-17 16:21 211 查看
实现一个流控程序,控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用。

[java]viewplaincopyprint?

importjava.util.Date;

importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

importjava.util.concurrent.Semaphore;

importjava.util.concurrent.TimeUnit;

importjava.util.concurrent.atomic.AtomicInteger;

publicclassFlowConcurrentController{

//每秒并发访问控制数量

finalstaticintMAX_QPS=10;

//并发控制信号量

finalstaticSemaphoresemaphore=newSemaphore(MAX_QPS);

//监控每秒并发访问次数(理论上accessCount.get()<=10)

finalstaticAtomicIntegeraccessCount=newAtomicInteger(0);

//模拟远程访问

privatestaticvoidremoteCall(inti,intj){

System.out.println(String.format("%s-%s:%d%d",newDate(),Thread.currentThread(),i,j));

}

privatestaticvoidreleaseWork(){//每秒release一次

//releasesemaphorethread

Executors.newScheduledThreadPool(1).scheduleAtFixedRate(newRunnable(){

@Override

publicvoidrun(){

semaphore.release(accessCount.get());

accessCount.set(0);

}

},1000,1000,TimeUnit.MILLISECONDS);

}

//模拟并发访问控制

privatestaticvoidsimulateAccess(finalintm,finalintn)

throwsException{//m:线程数;n:调用数

ExecutorServicepool=Executors.newFixedThreadPool(100);

for(inti=m;i>0;i--){

finalintx=i;

pool.submit(newRunnable(){

@Override

publicvoidrun(){

for(intj=n;j>0;j--){

try{

Thread.sleep(5);

}catch(InterruptedExceptione){

e.printStackTrace();

}

semaphore.acquireUninterruptibly(1);

accessCount.incrementAndGet();

remoteCall(x,j);

}

}

});

}

pool.shutdown();

pool.awaitTermination(1,TimeUnit.HOURS);

}

publicstaticvoidmain(String[]args)throwsException{

//开启releaseWork

releaseWork();

//开始模拟lotsofconcurrentcalls:100*1000

simulateAccess(100,1000);

}

}


上面的代码中存在一个小问题,就是accessCount的释放后,存在负数的情况,也就是说高并发的情况下每秒会存在>MAX_QPS次的并发访问次数,还不能做到非常精确控制。

期待大家更加简单和轻量的方式。

转自:http://www.cdtarena.com/javapx/201304/8327.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: