您的位置:首页 > 编程语言 > Java开发

Java 多线程 同类型任务只能每次执行指定个数

2015-03-21 21:15 393 查看
背景:多线程访问liunx,但是liunx每次最多访问是有限制的,不然会拒绝访问。

这里做法是每次执行相同类型用户访问不超过指定个数,必须执行完释放才继续执行未完相同类型任务。

import java.util.ArrayList;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AAA implements Cloneable {

// 能够访问用户机器的最大数
private final static int COUNT_INT = 15;

// 记录每种用户能登陆的最大个数
private static Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();

// 未完需要继续执行的任务
private static Collection<Probe2> probeList = new ConcurrentLinkedQueue<Probe2>();

/**
* @param args
*/
public static void main(String[] args) {
long startTime = System.currentTimeMillis(); // 获取开始时间
ExecutorService exServ = Executors.newFixedThreadPool(25);

Probe2 aa = null;
for (int k = 0; k < 300; k++) {
aa = new Probe2();
aa.setAccId("a");
probeList.add(aa);
}
Probe2 bb = null;
for (int k = 0; k < 300; k++) {
bb = new Probe2();
bb.setAccId("b");
probeList.add(bb);
}
Probe2 cc = null;
for (int k = 0; k < 300; k++) {
cc = new Probe2();
cc.setAccId("c");
probeList.add(cc);
}
System.out.println("总共需要执行的任务数" + probeList.size());

List<Future<String>> futList = new ArrayList<Future<String>>();

// 定时执行
ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);
AAA tt = new AAA();
Runnable beeper = tt.timesss(probeList, exServ, futList, startTime);
scheduler.scheduleAtFixedRate(beeper, 1, 1, TimeUnit.SECONDS);

// 暂停5S,后面再往里面添加不同的任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}

Probe2 aa1 = null;
for (int k = 0; k < 300; k++) {
aa1 = new Probe2();
aa1.setAccId("fff");
probeList.add(aa1);
}
Probe2 bb1 = null;
for (int k = 0; k < 300; k++) {
bb1 = new Probe2();
bb1.setAccId("rrr");
probeList.add(bb1);
}
Probe2 cc1 = null;
for (int k = 0; k < 300; k++) {
cc1 = new Probe2();
cc1.setAccId("www");
probeList.add(cc1);
}
}

private Runnable timesss(final Collection<Probe2> probeList,
final ExecutorService exServ, final List<Future<String>> futList,
final long startTime) {
return (new Runnable() {
public void run() {
if (probeList != null) {
if (probeList.isEmpty()) {
long endTime = System.currentTimeMillis(); // 获取结束时间
System.out.println("程序运行时间: " + (endTime - startTime)
/ 1000 + "s");
return;
}
}
AAA tt = new AAA();
tt.forfor(counts, probeList, exServ, futList, startTime);
}
});
}

public void forfor(Map<String, Integer> counts,
Collection<Probe2> probeList, ExecutorService exServ,
List<Future<String>> futList, long startTime) {

Iterator<Probe2> iter = probeList.iterator();
Probe2 pr = null;
while (iter.hasNext()) {
pr = iter.next();
if (!futList.isEmpty()) {
// System.out.println("上次执行个数" + futList.size());
for (Future<String> fut : futList) {
if (fut.isDone()) {
try {
String endStr = fut.get();
int mark = counts.get(endStr);
System.out.println(endStr + "完成当前个数" + mark);
if (mark == 0) {
counts.remove(endStr);
} else {
counts.put(endStr, mark - 1);
}
} catch (Exception e) {
}
}
}
}

if (counts.get(pr.getAccId()) == null) {
futList.add(submitTask(exServ, pr, counts, probeList));
counts.put(pr.getAccId(), 1);
iter.remove();
} else {
// 先判断是否大于2个,大于就等待下一次执行了
if (counts.get(pr.getAccId()) < COUNT_INT) {
futList.add(submitTask(exServ, pr, counts, probeList));
counts.put(pr.getAccId(), counts.get(pr.getAccId()) + 1);
iter.remove();
}
}
}

}

private Future<String> submitTask(ExecutorService exServ, Probe2 pr,
Map<String, Integer> counts, Collection<Probe2> probeList) {
AAA a = new AAA();
Future<String> kk = exServ.submit(a.runExc(pr.getAccId()));
return kk;
}

private Callable<String> runExc(final String i) {
return (new Callable<String>() {
public String call() {
System.out.println(i + "开始执行==============");
// if (i.equals("a")) {
try {
Thread.sleep(1500);//模拟任务执行时间1.5S
} catch (InterruptedException e) {
e.printStackTrace();
}
// }

System.out.println(i + "执行完了");

return i + "";
}
});
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: