您的位置:首页 > 大数据

几种数据库的大数据批量插入

2013-01-16 16:40 423 查看
在之前只知道SqlServer支撑数据批量插入,殊不知道Oracle、SQLite和MySql也是支撑的,不过Oracle须要应用Orace.DataAccess驱动

起首说一下,IProvider里有一个用于实现批量插入的插件办事接口IBatcherProvider

/// <summary>

/// 供给数据批量处理惩罚的办法。

/// </summary>

public interface IBatcherProvider : IProviderService

{

/// <summary>

/// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

/// </summary>

/// <param name="dataTable">要批量插入的
<see cref="DataTable"/>。</param>

/// <param name="batchSize">每批次写入的数据量。</param>

void Insert(DataTable dataTable, int batchSize =
10000);

}

一、SqlServer数据批量插入

SqlServer的批量插入很简单,应用SqlBulkCopy就可以,以下是该类的实现:

/// <summary>

/// 为 System.Data.SqlClient 供给的用于批量操纵的办法。

/// </summary>

public sealed class MsSqlBatcher : IBatcherProvider

{

/// <summary>

/// 获取或设置供给者办事的高低文。

/// </summary>

public ServiceContext ServiceContext { get;
set; }

/// <summary>

/// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

/// </summary>

/// <param name="dataTable">要批量插入的
<see cref="DataTable"/>。</param>

/// <param name="batchSize">每批次写入的数据量。</param>

public void Insert(DataTable dataTable,
int batchSize = 10000)

{

Checker.ArgumentNull(dataTable, "dataTable");

if (dataTable.Rows.Count == 0)

{

return;

}

using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection())

{

try

{

connection.TryOpen();

//给表名加上前后导符

var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);

using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity,
null)

{

DestinationTableName = tableName,

BatchSize = batchSize

})

{

//轮回所有列,为bulk添加映射

dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);

bulk.WriteToServer(dataTable);

bulk.Close();

}

}

catch (Exception exp)

{

throw new BatcherException(exp);

}

finally

{

connection.TryClose();

}

}

}

}

SqlBulkCopy的ColumnMappings中列的名称受大小写敏感限制,是以在机关DataTable的时辰应请重视列名要与表一致。

以上没有应用事务,应用事务在机能上会有必然的影响,若是要应用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。

二、Oracle数据批量插入

System.Data.OracleClient不支撑批量插入,是以只能应用Oracle.DataAccess组件来作为供给者。

/// <summary>

/// Oracle.Data.Access 组件供给的用于批量操纵的办法。

/// </summary>

public sealed class OracleAccessBatcher : IBatcherProvider

{

/// <summary>

/// 获取或设置供给者办事的高低文。

/// </summary>

public ServiceContext ServiceContext { get;
set; }

/// <summary>

/// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

/// </summary>

/// <param name="dataTable">要批量插入的
<see cref="DataTable"/>。</param>

/// <param name="batchSize">每批次写入的数据量。</param>

public void Insert(DataTable dataTable,
int batchSize = 10000)

{

Checker.ArgumentNull(dataTable, "dataTable");

if (dataTable.Rows.Count == 0)

{

return;

}

using (var connection = ServiceContext.Database.CreateConnection())

{

try

{

connection.TryOpen();

using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

{

if (command == null)

{

throw new BatcherException(new ArgumentException("command"));

}

command.Connection = connection;

command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);

command.uteNonQuery();

}

}

catch (Exception exp)

{

throw new BatcherException(exp);

}

finally

{

connection.TryClose();

}

}

}

/// <summary>

/// 生成插入数据的sql语句。

/// </summary>

/// <param name="database"></param>

/// <param name="command"></param>

/// <param name="table"></param>

/// <returns></returns>

private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)

{

var names = new StringBuilder();

var values = new StringBuilder();

//将一个DataTable的数据转换为数组的数组

var data = table.ToArray();

//设置ArrayBindCount属性

command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count,
null);

var syntax = database.Provider.GetService<ISyntaxProvider>();

for (var i = 0; i < table.Columns.Count; i++)

{

var column = table.Columns[i];

var parameter = database.Provider.DbProviderFactory.CreateParameter();

if (parameter == null)

{

continue;

}

parameter.ParameterName = column.ColumnName;

parameter.Direction = ParameterDirection.Input;

parameter.DbType = column.DataType.GetDbType();

parameter.Value = data[i];

if (names.Length > 0)

{

names.Append(",");

values.Append(",");

}

names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName));

values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);

command.Parameters.Add(parameter);

}

return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);

}

}

以上最首要的一步,就是将DataTable转为数组的数组默示,即object[][],前数组的上标是列的个数,后数组是行的个数,是以轮回Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而语句与一般的插入语句没有什么不一样。

三、SQLite数据批量插入

SQLite的批量插入只需开启事务就可以了,这个具体的道理不得而知。

public sealed class SQLiteBatcher : IBatcherProvider

{

/// <summary>

/// 获取或设置供给者办事的高低文。

/// </summary>

public ServiceContext ServiceContext { get;
set; }

/// <summary>

/// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

/// </summary>

/// <param name="dataTable">要批量插入的
<see cref="DataTable"/>。</param>

/// <param name="batchSize">每批次写入的数据量。</param>

public void Insert(DataTable dataTable,
int batchSize = 10000)

{

Checker.ArgumentNull(dataTable, "dataTable");

if (dataTable.Rows.Count == 0)

{

return;

}

using (var connection = ServiceContext.Database.CreateConnection())

{

DbTransaction transcation = null;

try

{

connection.TryOpen();

transcation = connection.BeginTransaction();

using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

{

if (command == null)

{

throw new BatcherException(new ArgumentException("command"));

}

command.Connection = connection;

command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);

if (command.CommandText == string.Empty)

{

return;

}

var flag = new AssertFlag();

dataTable.EachRow(row =>

{

var first = flag.AssertTrue();

ProcessCommandParameters(dataTable, command, row, first);

command.uteNonQuery();

});

}

transcation.Commit();

}

catch (Exception exp)

{

if (transcation != null)

{

transcation.Rollback();

}

throw new BatcherException(exp);

}

finally

{

connection.TryClose();

}

}

}

private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row,
bool first)

{

for (var c = 0; c < dataTable.Columns.Count; c++)

{

DbParameter parameter;

//初次创建参数,是为了应用缓存

if (first)

{

parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();

parameter.ParameterName = dataTable.Columns[c].ColumnName;

command.Parameters.Add(parameter);

}

else

{

parameter = command.Parameters[c];

}

parameter.Value = row[c];

}

}

/// <summary>

/// 生成插入数据的sql语句。

/// </summary>

/// <param name="database"></param>

/// <param name="table"></param>

/// <returns></returns>

private string GenerateInserSql(IDatabase database, DataTable table)

{

var syntax = database.Provider.GetService<ISyntaxProvider>();

var names = new StringBuilder();

var values = new StringBuilder();

var flag = new AssertFlag();

table.EachColumn(column =>

{

if (!flag.AssertTrue())

{

names.Append(",");

values.Append(",");

}

names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));

values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);

});

return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);

}

}

四、MySql数据批量插入

/// <summary>

/// 为 MySql.Data 组件供给的用于批量操纵的办法。

/// </summary>

public sealed class MySqlBatcher : IBatcherProvider

{

/// <summary>

/// 获取或设置供给者办事的高低文。

/// </summary>

public ServiceContext ServiceContext { get;
set; }

/// <summary>

/// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

/// </summary>

/// <param name="dataTable">要批量插入的
<see cref="DataTable"/>。</param>

/// <param name="batchSize">每批次写入的数据量。</param>

public void Insert(DataTable dataTable,
int batchSize = 10000)

{

Checker.ArgumentNull(dataTable, "dataTable");

if (dataTable.Rows.Count == 0)

{

return;

}

using (var connection = ServiceContext.Database.CreateConnection())

{

try

{

connection.TryOpen();

using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

{

if (command == null)

{

throw new BatcherException(new ArgumentException("command"));

}

command.Connection = connection;

command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);

if (command.CommandText == string.Empty)

{

return;

}

command.uteNonQuery();

}

}

catch (Exception exp)

{

throw new BatcherException(exp);

}

finally

{

connection.TryClose();

}

}

}

/// <summary>

/// 生成插入数据的sql语句。

/// </summary>

/// <param name="database"></param>

/// <param name="command"></param>

/// <param name="table"></param>

/// <returns></returns>

private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)

{

var names = new StringBuilder();

var values = new StringBuilder();

var types = new List<DbType>();

var count = table.Columns.Count;

var syntax = database.Provider.GetService<ISyntaxProvider>();

table.EachColumn(c =>

{

if (names.Length > 0)

{

names.Append(",");

}

names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName));

types.Add(c.DataType.GetDbType());

});

var i = 0;

foreach (DataRow row in table.Rows)

{

if (i > 0)

{

values.Append(",");

}

values.Append("(");

for (var j = 0; j < count; j++)

{

if (j > 0)

{

values.Append(", ");

}

var isStrType = IsStringType(types[j]);

var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);

if (parameter != null)

{

values.Append(parameter.ParameterName);

command.Parameters.Add(parameter);

}

else if (isStrType)

{

values.AppendFormat("""{0}""", row[j]);

}

else

{

values.Append(row[j]);

}

}

values.Append(")");

i++;

}

return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values);

}

/// <summary>

/// 断定是否为字符串类别。

/// </summary>

/// <param name="dbType"></param>

/// <returns></returns>

private bool IsStringType(DbType dbType)

{

return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;

}

/// <summary>

/// 创建参数。

/// </summary>

/// <param name="provider"></param>

/// <param name="isStrType"></param>

/// <param name="dbType"></param>

/// <param name="value"></param>

/// <param name="parPrefix"></param>

/// <param name="row"></param>

/// <param name="col"></param>

/// <returns></returns>

private DbParameter CreateParameter(IProvider provider,
bool isStrType, DbType dbType, object value, char parPrefix,
int row, int col)

{

//若是生成全部的参数,则速度会很慢,是以,只稀有据类型为字符串(包含""号)和日期型时才添加参数

if ((isStrType && value.ToString().IndexOf(""\"""") != -1) || dbType == DbType.DateTime)

{

var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col);

var parameter = provider.DbProviderFactory.CreateParameter();

parameter.ParameterName = name;

parameter.Direction = ParameterDirection.Input;

parameter.DbType = dbType;

parameter.Value = value;

return parameter;

}

return null;

}

}

MySql的批量插入,是将值全部写在语句的values里,例如, batcher(id, name) values(1, ""1"", 2, ""2"", 3, ""3"", ........ 10, ""10"")。

五、测试

接下来写一个测试用例来看一下应用批量插入的结果。

[Test]

public void TestBatchInsert()

{

Console.WriteLine(TimeWatcher.Watch(() =>

InvokeTest(database =>

{

var table = new DataTable("Batcher");

table.Columns.Add("Id",
typeof(int));

table.Columns.Add("Name1",
typeof(string));

table.Columns.Add("Name2",
typeof(string));

table.Columns.Add("Name3",
typeof(string));

table.Columns.Add("Name4",
typeof(string));

//机关100000条数据

for (var i = 0; i <
100000; i++)

{

table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());

}

//获取 IBatcherProvider

var batcher = database.Provider.GetService<IBatcherProvider>();

if (batcher == null)

{

Console.WriteLine("不支撑批量插入。");

}

else

{

batcher.Insert(table);

}

//输出batcher表的数据量

var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher");

Console.WriteLine("当前共有 {0} 条数据", database.uteScalar(sql));

})));

}

以下表中列出了四种数据库生成10万条数据各耗用的时候

数据库
耗用时候
MsSql00:00:02.9376300
Oracle00:00:01.5155959
SQLite00:00:01.6275634
MySql00:00:05.4166891
数据库
耗用时候
MsSql00:00:02.9376300
Oracle00:00:01.5155959
SQLite00:00:01.6275634
MySql00:00:05.4166891
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: