一种高并发流控程序的简单轻量实现
2013-04-17 16:21
211 查看
实现一个流控程序,控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用。
[java]viewplaincopyprint?
importjava.util.Date;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassFlowConcurrentController{
//每秒并发访问控制数量
finalstaticintMAX_QPS=10;
//并发控制信号量
finalstaticSemaphoresemaphore=newSemaphore(MAX_QPS);
//监控每秒并发访问次数(理论上accessCount.get()<=10)
finalstaticAtomicIntegeraccessCount=newAtomicInteger(0);
//模拟远程访问
privatestaticvoidremoteCall(inti,intj){
System.out.println(String.format("%s-%s:%d%d",newDate(),Thread.currentThread(),i,j));
}
privatestaticvoidreleaseWork(){//每秒release一次
//releasesemaphorethread
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(newRunnable(){
@Override
publicvoidrun(){
semaphore.release(accessCount.get());
accessCount.set(0);
}
},1000,1000,TimeUnit.MILLISECONDS);
}
//模拟并发访问控制
privatestaticvoidsimulateAccess(finalintm,finalintn)
throwsException{//m:线程数;n:调用数
ExecutorServicepool=Executors.newFixedThreadPool(100);
for(inti=m;i>0;i--){
finalintx=i;
pool.submit(newRunnable(){
@Override
publicvoidrun(){
for(intj=n;j>0;j--){
try{
Thread.sleep(5);
}catch(InterruptedExceptione){
e.printStackTrace();
}
semaphore.acquireUninterruptibly(1);
accessCount.incrementAndGet();
remoteCall(x,j);
}
}
});
}
pool.shutdown();
pool.awaitTermination(1,TimeUnit.HOURS);
}
publicstaticvoidmain(String[]args)throwsException{
//开启releaseWork
releaseWork();
//开始模拟lotsofconcurrentcalls:100*1000
simulateAccess(100,1000);
}
}
上面的代码中存在一个小问题,就是accessCount的释放后,存在负数的情况,也就是说高并发的情况下每秒会存在>MAX_QPS次的并发访问次数,还不能做到非常精确控制。
期待大家更加简单和轻量的方式。
转自:http://www.cdtarena.com/javapx/201304/8327.html
[java]viewplaincopyprint?
importjava.util.Date;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassFlowConcurrentController{
//每秒并发访问控制数量
finalstaticintMAX_QPS=10;
//并发控制信号量
finalstaticSemaphoresemaphore=newSemaphore(MAX_QPS);
//监控每秒并发访问次数(理论上accessCount.get()<=10)
finalstaticAtomicIntegeraccessCount=newAtomicInteger(0);
//模拟远程访问
privatestaticvoidremoteCall(inti,intj){
System.out.println(String.format("%s-%s:%d%d",newDate(),Thread.currentThread(),i,j));
}
privatestaticvoidreleaseWork(){//每秒release一次
//releasesemaphorethread
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(newRunnable(){
@Override
publicvoidrun(){
semaphore.release(accessCount.get());
accessCount.set(0);
}
},1000,1000,TimeUnit.MILLISECONDS);
}
//模拟并发访问控制
privatestaticvoidsimulateAccess(finalintm,finalintn)
throwsException{//m:线程数;n:调用数
ExecutorServicepool=Executors.newFixedThreadPool(100);
for(inti=m;i>0;i--){
finalintx=i;
pool.submit(newRunnable(){
@Override
publicvoidrun(){
for(intj=n;j>0;j--){
try{
Thread.sleep(5);
}catch(InterruptedExceptione){
e.printStackTrace();
}
semaphore.acquireUninterruptibly(1);
accessCount.incrementAndGet();
remoteCall(x,j);
}
}
});
}
pool.shutdown();
pool.awaitTermination(1,TimeUnit.HOURS);
}
publicstaticvoidmain(String[]args)throwsException{
//开启releaseWork
releaseWork();
//开始模拟lotsofconcurrentcalls:100*1000
simulateAccess(100,1000);
}
}
上面的代码中存在一个小问题,就是accessCount的释放后,存在负数的情况,也就是说高并发的情况下每秒会存在>MAX_QPS次的并发访问次数,还不能做到非常精确控制。
期待大家更加简单和轻量的方式。
转自:
相关文章推荐
- 一种高并发流控程序的简单轻量实现
- 一种高并发流控程序的简单轻量实现
- 一种高并发流控程序的简单轻量实现
- 一种高并发流控程序的简单轻量实现
- 一种简单的方法在程序中实现透明效果(JAVA)
- 一种简单的方法在程序中实现透明效果(JAVA)(源代码)
- C++实现的一个简单的多线程程序
- 用java编写一个简单的字符串加密解密程序,将字符串分成若干行,实现字符串以一列一列读取,并还原之前输入的字符串。这里实现4行输出。
- 简单的自动更新程序实现
- 通用型C/C++程序性能测试Benchmark的简单实现
- C#使用Mutex简单实现程序单实例运行的方法
- 我的实现图像简单复制的两个c程序
- hadoop, 用java 和 python 实现 worldcount 简单单词提取累加小程序
- 程序自删除的一种实现方式
- 简单实现udp网络程序发送,接受数据
- iphone程序中实现截屏的一种方法
- android客户端简单的聊天程序实现
- 一种简单,轻量,灵活的C#对象转Json对象的方案(续)
- 利用栈实现简单的求解迷宫程序
- 简单的用栈来实现平衡符号的程序