您的位置:首页 > Web前端 > Node.js

Node.js海量数据处理方案

2018-01-15 17:05 239 查看
今天做实验仿真,600多万条数据,第一次面临这个级别的数据量,激动不已。马不停蹄地随便做了点简单的处理,就让数据在屏幕上动了起来... ...
当跑到4913465条数据的时候,程序就中止了,郁闷... ...难度大数据仿真只有Java、Python这样的大体量的语言才能驾驭?于是,带着一颗不服的赤子之心,上下求索,重新窥探JavaScript语言的精妙。


程序报错界面



初始源码

var readline = require('readline');
var fs = require('fs');
var os = require('os');

var fReadName = './input.txt';
var fRead = fs.createReadStream(fReadName);

var objReadline = readline.createInterface({
input: fRead,
});

var index = 1;
var user_ID = [];
var check_in_time = [];
var latitude = [];
var longitude = [];
var location_ID = [];

objReadline.on('line', (line)=>{

var c = line.split('\t');
user_ID.push(c[0]);
check_in_time.push(c[1]);
latitude.push(c[2]);
longitude.push(c[3]);
location_ID.push(c[4]);
console.log(index,line);
index ++;
});

objReadline.on('close', ()=>{
// console.log('readline close...');
var user = Array.from(new Set(user_ID));
var ti = Array.from(new Set(check_in_time));
var lat = Array.from(new Set(latitude));
var lon = Array.from(new Set(l
4000
ongitude));
var lac = Array.from(new Set(location_ID));
console.log('总记录数:'+ index);
console.log('用户数:'+ ti.length);
console.log('时间:'+ ti.length);
console.log('纬度:'+ lat.length);
console.log('经度:'+ lon.length);
console.log('位置点:'+ lac.length);
});


然后我查了一下系统内存使用情况



并用
node.js
输入当前系统内存的总大小和空闲大小




源码

const os = require('os');

var t = os.totalmem() / 1024 / 1024;
var f = os.freemem() / 1024 / 1024;

console.log("系统内存总大小:" + a + "MB");
console.log("内存空闲大小:" + b + "MB");


查找了相关资料。最后找到了解释。原来是V8引擎对node程序进行了内存限制(64位系统下约1.4GB,32位系统下约为0.7GB)。而且我写的程序没有考虑内存问题,这里用5个数组存放了大量的数据,内存自然就挤爆了。
这个内存限制在浏览器的应用场景下绰绰有余,足以胜任前端页面中的所有需求。但在node中,这却限制了开发者随心所欲使用大内存的想法。那么如何解决node这块短板呢?node里面大体有两种方案:


1. 使用
Buffer
。这不会受到V8堆内存的限制。但是这种大片使用内存的情况依然要小心,即使V8不限制
Buffer
的堆内存大小,物理内存依然有限制。

2. 采用进程外的缓存,例如
Redis
。将缓存转移到外部,减少常驻内存的对象数量,让垃圾回收更高效。

下面,我采用方案二,对程序进行性能优化,先将
readline
循环体内的声明提到外面,然后去掉
readline
循环体里面的5个数组,改用
Redis
。运行结果如下:



源码

var readline = require('readline');
var fs = require('fs');
var os = require('os');

var redis = require("redis"),
client = redis.createClient();

var fReadName = './input.txt';

//总记录数
var index = 1;
// 缓冲数组,用于readline里面的逐行处理。
var c = [];

var fRead = fs.createReadStream(fReadName);

var objReadline = readline.createInterface({
input: fRead,
});

// 启动计时器
console.time("处理耗时");

// 如果Redis出现错误,接受打印错误
client.on("error", function (err) {
console.log("Error " + err);
});

// readline 逐行读取数据,并处理。
objReadline.on('line', (line)=>{

c = line.split('\t');

client.sadd("user_ID", c[0]);
client.sadd("check_in_time", c[1]);
client.sadd("latitude", c[2]);
client.sadd("longitude", c[3]);
client.sadd("location_ID", c[4]);

// 查看进程的内存占用
var m_rss = (process.memoryUsage().rss / 1024 / 1024).toFixed(2);
var s_rss = (os.freemem() / 1024 / 1024).toFixed(2);
console.log(index + "  " + m_rss + "MB" + "  " + s_rss + "MB");

index ++;
});

objReadline.on('close', ()=>{

console.log('总记录数:'+ (index-1));

client.multi()
.smembers("user_ID")
.exec(function (err, replies) {
// err,replies 返回null或者Array(内包双层数组,length值为1,[[0,1,2]])
// 此处的Array为内包双层数组,例如:[[0,1,2]],replies.length值为1.
// 若要求此处数组里面数据的个数需要这样处理:replies[0].length
console.log("用户数:" + replies[0].length);

});
client.multi()
.smembers("check_in_time")
.exec(function (err, replies) {
console.log("时间:" + replies[0].length);
});
client.multi()
.smembers("latitude")
.exec(function (err, replies) {
console.log("纬度:" + replies[0].length);
});
client.multi()
.smembers("longitude")
.exec(function (err, replies) {
console.log("经度:" + replies[0].length);
});
client.multi()
.smembers("location_ID")
.exec(function (err, replies) {
console.log("位置点:" + replies[0].length);
});

// 停止计时,输出时间
console.timeEnd("处理耗时");

});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: