您的位置:首页 > Web前端

【原创】有关Buffer使用,让你的日志类库解决IO高并发写

2017-03-19 12:55 417 查看
【本人原创[b]】,欢迎交流和分享技术,转载请附上如下内容:[/b]

[b]如果你觉得这篇文章对你有帮助,请记得帮我点赞, 谢谢![/b]

作者:itshare 【转自】http://www.cnblogs.com/itshare/

通常我们知道,当一个日志接口被外部程序多个线程请求后,如果没有使用Buffer和异步写入IO的处理。
CPU就会一直处于加锁和解锁的频繁切换,这样加上等待每次线程锁中的IO处理完毕的时间,
高并发调用日志接口的整个过程中响应和处理速度就会严重变慢,并且让CPU一直居高不下。

这里,Coding First,先看对比结果,最后我再附上LogBufferPool的代码实现。

下面我们分别做两组实验:时间单位为秒
100个线程,并发消费100000个请求。看下面每组实验的结果。

实验1:普通的日志接口,无Buffer缓冲池和异步写入IO的处理。







实验2:改造后日志接口,有Buffer缓冲池和异步写入IO的处理。







最后,在这里贴上我的代码实现:LogBufferPool

namespace ITS.Base.Comm.Logger_FastIO
{
/// <summary>
/// 日志缓冲池
/// </summary>
public class LogBufferPool :IDisposable
{
/// <summary>
/// 工作日期
/// </summary>
private DateTime worktime
{
get;
set;
}

/// <summary>
/// 上一次写入时间
/// </summary>
private DateTime LastWriteTime
{
get;
set;
}

/// <summary>
/// 日志的文件路径
/// </summary>
private string fpath
{
get;
set;
}

/// <summary>
/// 最大行数 - write buffer
/// </summary>
public long MaxRows
{
get;
private set;
}

/// <summary>
/// 最大字数 - write buffer
/// </summary>
public long MaxSize
{
get;
private set;
}

/// <summary>
/// 当前字数 - write buffer
/// </summary>
public long CurrnetSize
{
get;
private set;
}
/// <summary>
/// 当前行数 - write buffer
/// </summary>
public long CurrnetRows
{
get;
private set;
}

/// <summary>
/// Lock of Buffer write
/// </summary>
private static object Lock_Write_IO = 1;
/// <summary>
/// Flag of last write Buffter
/// </summary>
private static bool IsLastWriteCompleted = true;
/// <summary>
/// Time of Buffer write
/// </summary>
static System.Threading.Timer timer_work = null;

/// <summary>
/// 文件流
/// </summary>
private Dictionary<int, FileStream> Dic_Stream = null;

/// <summary>
/// 日志消息
/// </summary>
private Dictionary<int, List<string>> Dic_MesgData = null;

/// <summary>
/// 构造函数 - 日志缓冲池
/// </summary>
/// <param name="fpath">日志文件路径:logType</param>
/// <param name="max_size">最大字数 - write buffer</param>
/// <param name="max_rows">最大行数 - write buffer</param>
public LogBufferPool(string fpath, long max_size = 50000, long max_rows = 1000)
{
this.worktime = DateTime.Now;
this.LastWriteTime = DateTime.Now;
this.fpath = fpath;
this.MaxSize = max_size;
this.MaxRows = max_rows;
this.Dic_Stream = new Dictionary<int, FileStream>();
this.Dic_MesgData = new Dictionary<int, List<string>>();

IsLastWriteCompleted = true;
if (timer_work == null)
{
// 1*1000: 1秒后启动计时器
// 执行计划:每隔开秒执行一次
timer_work = new System.Threading.Timer(new System.Threading.TimerCallback(WriteBufferToFile), this, 1 * 1000, 5 * 1000);
}
}

/// <summary>
/// 写完日志后,再释放资源
/// </summary>
public void Dispose()
{
try
{
Dictionary<int, List<string>> dic = this.Dic_MesgData;
if (dic != null && dic.Count > 0)
{
foreach (KeyValuePair<int, List<string>> pair in this.Dic_MesgData)
{
WriteFile((LogTypeEnum)pair.Key, pair.Value);
}
}
}
catch (Exception ex)
{
// log
}
finally
{
GC.Collect();
}
}

/// <summary>
/// 添加日志消息
/// </summary>
/// <param name="logType">日志类别</param>
/// <param name="msg">日志内容</param>
/// <returns></returns>
public bool AddLog(LogTypeEnum logType, string msg)
{
bool flag = false;
List<string> list = null;
int n = 0; // 写入IO的buffer文件个数
List<int> keys = null; // 写入IO的buffer文件个数

//long size_text = 0, row_text = 0;
long index_row = 0, count_row = list != null ? list.Count : 0;

try
{
if (!this.Dic_MesgData.TryGetValue((int)logType, out list))
{
list = new List<string>();
this.Dic_MesgData.Add((int)logType, list);

//FileStream fs = File.Open(string.Format(fpath, logType), FileMode.Append, FileAccess.Write);
//// fs.WriteTimeout = 24 * 60 * 60;
//this.Dic_Stream.Add((int)logType, fs);
}

// 添加日志消息
list.Add(msg);
index_row++;

this.CurrnetSize += msg.Length;
this.CurrnetRows++;

// 根据缓冲池大小,定期清理和写入IO文件
//if ((this.CurrnetSize >= this.MaxSize
//    || this.CurrnetRows >= this.MaxRows)
//    || (DateTime.Now - this.LastWriteTime).TotalSeconds > 10)
//{
//    this.CurrnetSize = 0;
//    this.CurrnetRows = 0;

//    keys = Dic_MesgData.Keys.ToList();
//    foreach (int key in keys)
//    {
//        List<string> list_old = this.Dic_MesgData[key];
//        this.Dic_MesgData[key] = new List<string>();

//        bool flag_write = this.WriteFile((LogTypeEnum)key, list_old);
//        if (flag_write)
//        {
//            n++;
//        }
//    }
//}

//WriteBufferToFile(null);

flag = true;

// flag = keys != null && keys.Count > 0 ? n == keys.Count : true;
}
catch (Exception ex)
{
// log info
}

return flag;
}

private void WriteBufferToFile(object obj)
{
List<string> list = null;
int n = 0; // 写入IO的buffer文件个数
List<int> keys = null; // 写入IO的buffer文件个数

lock (Lock_Write_IO)
{
if (IsLastWriteCompleted) // 判读上一次写入IO是否完成
{
// 根据缓冲池大小,定期清理和写入IO文件
if ((this.CurrnetSize >= this.MaxSize
|| this.CurrnetRows >= this.MaxRows))
{
IsLastWriteCompleted = false;

this.CurrnetSize = 0;
this.CurrnetRows = 0;

keys = Dic_MesgData.Keys.ToList();
foreach (int key in keys)
{
List<string> list_old = this.Dic_MesgData[key];
this.Dic_MesgData[key] = new List<string>();

bool flag_write = this.WriteFile((LogTypeEnum)key, list_old);
if (flag_write)
{
n++;
}
}

IsLastWriteCompleted = true;
}
}
}
}

/// <summary>
/// 异步写入日志文件
/// </summary>
/// <param name="logType">日志类别</param>
/// <param name="list">日志内容</param>
/// <returns></returns>
public bool WriteFile(LogTypeEnum logType, List<string> list)
{
{
bool flag = false;

FileStream fs = null;
StringBuilder sb = new StringBuilder();
byte[] data = null;

long size_text = 0, row_text = 0;
long index_row = 0, count_row = list != null ? list.Count : 0;

//if (!this.Dic_Stream.TryGetValue((int)logType, out fs))
//{
//    return false;
//}

try
{
fs = File.Open(string.Format(fpath, logType), FileMode.Append, FileAccess.Write);

foreach (string item in list)
{
sb.Append(item);
index_row++; // 当前位置

size_text += item.Length;
row_text++;

if ((size_text >= 10 * 10000 || row_text >= 1000)
|| (index_row == count_row && sb.Length > 0))
{
size_text = 0;
row_text = 0;

// wrire file
data = Encoding.UTF8.GetBytes(sb.ToString());

#region 异步写入
//IAsyncResult asyc = fs.BeginWrite(data, 0, data.Length, (o) =>
//    {
//        object ret = o;
//    }, null);
//asyc.AsyncWaitHandle.WaitOne();
//fs.EndWrite(asyc);
#endregion

#region 同步写入
fs.Write(data, 0, data.Length);
#endregion

fs.Flush(); // test code

size_text = 0;
row_text = 0;

data = null;
sb = null;
sb = new StringBuilder();
}
}

flag = index_row == count_row;
}
catch (Exception ex)
{
// log info
}
finally
{
if (sb != null)
sb = null;
if (data != null)
data = null;
if (list != null)
list = null;

if (fs != null)
fs.Dispose();
}

return flag;
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐