您的位置:首页 > 编程语言 > Java开发

java多线程读取多个文件 导入数据库

2017-05-16 10:53 609 查看
近期在做Java读文件的项目,由于数据量较大,因此研究了一下多线程,总结了一下:
一. 多个线程读文件和单个线程读文件,效率差不多,甚至可能不如单线程,原因如下:
如果只是单纯的读文件,一个线程足够了,因为一般瓶颈是在磁盘io上,多个线程只会在磁盘io上阻塞。因为不同文件的读写,会造成磁头的频繁转换,磁头的频繁转换要比读取磁盘的时间更长。
但是一般是读一小块做一次处理,然后再读下一块,这样只用一个线程磁盘io有空闲的时间,就可以用多线程处理,有的线程在读数据有的线程在处理数据。而且磁盘是有缓存的,一次读48行,可能会缓存后面的1m内容,下n次其他线程来读的时候磁盘可以直接从缓存中取数据。估计线程数不会超过10个,太多线程仍然会阻塞在磁盘io上。但是随机读取文件无法利用缓存机制,而且硬盘不断的重新定位会花费大量的寻道时间,估计效率还比不上多个线程用同一个指针顺序读取文件。理论推测,具体还是得自己写程序跑一下。(原文链接:http://www.zhihu.com/question/20149395/answer/14136499)
二. 所以这种情况下,最好有个线程去读取文件,其他的线程去处理文件数据中的业务逻辑处理(参考:http://www.dewen.net.cn/q/1334)
解决方案: 

(1)首先,开辟一个Reader线程, 该线程负责读取文件,将读取记录存入队列中(LinkedBlockingQueue 或者 ArrayBlockingQueue)
ArrayBlockingQueue跟LinkedBlockingQueue的区别:

队列中的锁的实现不同 

ArrayBlockingQueue中的锁是没有分离的,即生产和消费用的是同一个锁; 

LinkedBlockingQueue中的锁是分离的,即生产用的是putLock,消费是takeLock

在生产或消费时操作不同 

ArrayBlockingQueue基于数组,在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例; 

LinkedBlockingQueue基于链表,在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会生成一个额外的Node对象,这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

队列大小初始化方式不同 

ArrayBlockingQueue是有界的,必须指定队列的大小; 

LinkedBlockingQueue是无界的,可以不指定队列的大小,但是默认是Integer.MAX_VALUE。当然也可以指定队列大小,从而成为有界的。

注意: 

在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出。
在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时, 

LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右, 

即LinkedBlockingQueue消耗在1500毫秒左右,而ArrayBlockingQueue只需150毫秒左右。
按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
(2)再开辟若干个线程,负责从队列中取数据,并插入数据库
(3)Reader线程读取完成后,应“通知”处理线程,当处理线程处理完队列的记录,并发现Reader线程已终止的时候,就停止了。 

(参考:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination

终止线程的三种方法
使用退出标志,使线程正常退出,也就是当run方法完成后线程终止(一般线程中的任务是放在一个循环中,需要退出时只需破坏循环的条件,退出循环即可)。
使用stop方法强行终止线程(这个方法不推荐使用,因为stop和suspend、resume一样,也可能发生不可预料的结果)。
使用interrupt方法中断线程。
批量读文件入库程序可参考: 
http://blog.csdn.net/u010323023/article/details/52403046?locationNum=5
一对多实例(参考链接:http://lucky-xingxing.iteye.com/blog/2054071):
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created with IntelliJ IDEA.
* Date: 4/24/14
* Time: 9:56 AM
* To change this template use File | Settings | File Templates.
*
* 生产者与消费者模型中,要保证以下几点:
* 1 同一时间内只能有一个生产者生产
* 2 同一时间内只能有一个消费者消费
* 3 生产者生产的同时消费者不能消费
* 4 消费者消费的同时生产者不能生产
* 5 共享空间空时消费者不能继续消费
* 6 共享空间满时生产者不能继续生产
*
* 使用并发库中的BlockingQueue(阻塞队列) 实现生产者与消费者
*/
public class WaitNoticeDemo {
public static void main(String[] args) {

//固定容器大小为10
BlockingQueue<Food> foods = new LinkedBlockingQueue<Food>(100);

boolean completeFlag = false;

Thread produce = new Thread(new Produce(foods));
Thread consume1 = new Thread(new Consume(foods));
Thread consume2 = new Thread(new Consume(foods));
produce.start();
consume1.start();
consume2.start();

}
}

/**
* 生产者
*/
class Produce implements Runnable{
private BlockingQueue<Food> foods;
private Integer count = 0;
private boolean exitFlag = false;
Produce(BlockingQueue<Food> foods) {
this.foods = foods;
}

//@Override
public void run() {
int i = 0;
while (i<50){
try {
//当生产的食品数量装满了容器,那么在while里面该食品容器(阻塞队列)会自动阻塞  wait状态 等待消费
foods.put(new Food("食品"+i));
i++;
if(i == 50){
foods.put(new Food("end"));

}
} catch (InterruptedException e) {
e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
}
}//while

System.out.println("生产者线程结束");
}
}

/**
* 消费者
*/
class Consume implements Runnable {
private BlockingQueue<Food> foods;
private boolean flag = true;

Consume(BlockingQueue<Food> foods){
this.foods = foods;
}
//@Override
public void run() {
System.out.println("消费者线程 - " + Thread.currentThread().getName() + "启动");
long start = System.currentTimeMillis(); // 记录起始时间
try {
//Thread.sleep(3);  //用于测试当生产者生产满10个食品后是否进入等待状态
while (flag){
//当容器里面的食品数量为空时,那么在while里面该食品容器(阻塞队列)会自动阻塞  wait状态 等待生产
Food food = foods.take();
System.out.println(Thread.currentThread().getName() + "消费"+food.getName());

if(("end").equals(food.getName())){
flag = false;
foods.put(food);//将结束标志放进队列  以防别的消费者线程看不到

long end = System.currentTimeMillis(); // 记录起始时间
System.out.println("execuete time : " + (end-start));
}

}//while

System.out.println("消费者线程结束");
} catch (InterruptedException e) {
e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
}
}
}

/**
* 食品
*/
class Food{
private String name;

String getName() {
return name;
}

Food(String name){
this.name = name;
System.out.println("生产"+name);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
三. 最近碰到csv文件上传存入数据库后,数据库存储空间远大于实际文件大小的情况,以下是针对该种情况的解决方案: 

1. 设置的字段类型、长度尽可能和文件中对应字段接近; 

2. 不要CHAR, 多用VARCHAR,在数据量比较大时,VARCHAR的优势特别明显。
四. 对于处理大数据量的记录,并将处理结果写入文件中的处理方案: 

方案一(适合于处理和输出的数据量都很大的情况): 

生产者:多个线程 读取一定量的数据并处理,然后将处理结果封装成一个队列元素,装进阻塞队列中 

消费者: 一个线程 取元素 追加写文件(csv) (多个线程写文件是不安全的)
方案二(目前在使用的,适用于需要处理的数据量大,但输出的数据量不大的情况): 

生产者:一个线程,分页查询部分数据,将其封装成队列元素装进队列中 

消费者:多个线程 ,从队列中取出数据元素并处理,存储处理结果。 

生产者和消费者执行完毕后,再集中将消费者处理的结果一个个输出到相应文件中
五. 数据记录验证问题 

对于用数据库中的一个数据表验证另一个数据表中数据的情况,应采用SQL连接查询(适用于待验证和标准数据都在数据库的情况);
而对于用数据库中的一个数据表验证文件中的数据记录的情况,则需要首先根据某些关键字从数据库中查出相关记录,然后在内存中进行数据的验证处理工作,避免多次访问数据库(适用于待验证数据不多,而验证的标准数据很多的情况)。
六.文件字符串长度大于数据库规定长度时,截断处理 

数据库中 nchar (nvarchar)类型中文字符和英文字符长度都为1,nchar(10)的字段可以存放十个中英文字符,而char(varchar)类型中文字符长度为2,,英文字符长度为1,char(10)的字段可以存放五个中文字符,10个英文字符。
针对char类型,中文字符长度为2,英文字符长度为1的情况,计算字符串长度和对字符串截断处理的代码如下:
// 判断一个字符是否是中文
public static boolean isChineseChar(char c) {
return c >= 0x4E00 &&  c <= 0x9FA5;// 根据字节码判断
}

//20170302 针对数据库varchar类型 计算字符串长度:中文字符数*2 + 英文字符数<= 数据库字段设定长度
// 判断一个字符串是否含有中文
public static Integer getDBStrLength(String str) {
Integer len = 0;
if(str != null && str.length()>0){
char[] arr = str.toCharArray();
for(int i=0; i<arr.length; i++){
if(isChineseChar(arr[i])){
len += 2;
}else{
len += 1;
}
}//for
}

return len;
}

//20170302 针对数据库varchar类型 根据字符串长度以及长度限制  对字符串进行截断处理
public static String getTruncateStrByMaxLength(String str,Integer maxLen) {
String result = "";
Integer len = 0;
if(str != null && str.length()>0){
char[] arr = str.toCharArray();
for(int i=0; i<arr.length; i++){
if(isChineseChar(arr[i])){
len += 2;
}else{
len += 1;
}
if(len <= maxLen){
result += arr[i];
}else{
break;
}
}//for
}

return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: