您的位置:首页 > 数据库

SQL Server 批量插入数据的两种方法

2013-07-12 11:09 597 查看
在SQL Server 中插入一条数据使用Insert语句,但是如果想要批量插入一堆数据的话,循环使用Insert不仅效率低,而且会导致SQL一系统性能问题。下面介绍SQL Server支持的两种批量数据插入方法:Bulk和表值参数(Table-Valued Parameters)。

运行下面的脚本,建立测试数据库和表值参数。

[c-sharp] view
plaincopy

--Create DataBase

create database BulkTestDB;

go

use BulkTestDB;

go

--Create Table

Create table BulkTestTable(

Id int primary key,

UserName nvarchar(32),

Pwd varchar(16))

go

--Create Table Valued

CREATE TYPE BulkUdt AS TABLE

(Id int,

UserName nvarchar(32),

Pwd varchar(16))

下面我们使用最简单的Insert语句来插入100万条数据,代码如下:

[c-sharp] view
plaincopy

Stopwatch sw = new Stopwatch();

SqlConnection sqlConn = new SqlConnection(

ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);//连接数据库

SqlCommand sqlComm = new SqlCommand();

sqlComm.CommandText = string.Format("insert into BulkTestTable(Id,UserName,Pwd)values(@p0,@p1,@p2)");//参数化SQL

sqlComm.Parameters.Add("@p0", SqlDbType.Int);

sqlComm.Parameters.Add("@p1", SqlDbType.NVarChar);

sqlComm.Parameters.Add("@p2", SqlDbType.VarChar);

sqlComm.CommandType = CommandType.Text;

sqlComm.Connection = sqlConn;

sqlConn.Open();

try

{

//循环插入100万条数据,每次插入10万条,插入10次。

for (int multiply = 0; multiply < 10; multiply++)

{

for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)

{

sqlComm.Parameters["@p0"].Value = count;

sqlComm.Parameters["@p1"].Value = string.Format("User-{0}", count * multiply);

sqlComm.Parameters["@p2"].Value = string.Format("Pwd-{0}", count * multiply);

sw.Start();

sqlComm.ExecuteNonQuery();

sw.Stop();

}

//每插入10万条数据后,显示此次插入所用时间

Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));

}

}

catch (Exception ex)

{

throw ex;

}

finally

{

sqlConn.Close();

}

Console.ReadLine();

耗时图如下:



由于运行过慢,才插入10万条就耗时72390 milliseconds,所以我就手动强行停止了。

下面看一下使用Bulk插入的情况:

bulk方法主要思想是通过在客户端把数据都缓存在Table中,然后利用SqlBulkCopy一次性把Table中的数据插入到数据库

代码如下:

[c-sharp] view
plaincopy

public static void BulkToDB(DataTable dt)

{

SqlConnection sqlConn = new SqlConnection(

ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);

SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn);

bulkCopy.DestinationTableName = "BulkTestTable";

bulkCopy.BatchSize = dt.Rows.Count;

try

{

sqlConn.Open();

if (dt != null && dt.Rows.Count != 0)

bulkCopy.WriteToServer(dt);

}

catch (Exception ex)

{

throw ex;

}

finally

{

sqlConn.Close();

if (bulkCopy != null)

bulkCopy.Close();

}

}

public static DataTable GetTableSchema()

{

DataTable dt = new DataTable();

dt.Columns.AddRange(new DataColumn[]{

new DataColumn("Id",typeof(int)),

new DataColumn("UserName",typeof(string)),

new DataColumn("Pwd",typeof(string))});

return dt;

}

static void Main(string[] args)

{

Stopwatch sw = new Stopwatch();

for (int multiply = 0; multiply < 10; multiply++)

{

DataTable dt = Bulk.GetTableSchema();

for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)

{

DataRow r = dt.NewRow();

r[0] = count;

r[1] = string.Format("User-{0}", count * multiply);

r[2] = string.Format("Pwd-{0}", count * multiply);

dt.Rows.Add(r);

}

sw.Start();

Bulk.BulkToDB(dt);

sw.Stop();

Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));

}

Console.ReadLine();

}

耗时图如下:



可见,使用Bulk后,效率和性能明显上升。使用Insert插入10万数据耗时72390,而现在使用Bulk插入100万数据才耗时17583。

最后再看看使用表值参数的效率,会另你大为惊讶的。

表值参数是SQL Server 2008新特性,简称TVPs。对于表值参数不熟悉的朋友,可以参考最新的book online,我也会另外写一篇关于表值参数的博客,不过此次不对表值参数的概念做过多的介绍。言归正传,看代码:

[c-sharp] view
plaincopy

public static void TableValuedToDB(DataTable dt)

{

SqlConnection sqlConn = new SqlConnection(

ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);

const string TSqlStatement =

"insert into BulkTestTable (Id,UserName,Pwd)" +

" SELECT nc.Id, nc.UserName,nc.Pwd" +

" FROM @NewBulkTestTvp AS nc";

SqlCommand cmd = new SqlCommand(TSqlStatement, sqlConn);

SqlParameter catParam = cmd.Parameters.AddWithValue("@NewBulkTestTvp", dt);

catParam.SqlDbType = SqlDbType.Structured;

//表值参数的名字叫BulkUdt,在上面的建立测试环境的SQL中有。

catParam.TypeName = "dbo.BulkUdt";

try

{

sqlConn.Open();

if (dt != null && dt.Rows.Count != 0)

{

cmd.ExecuteNonQuery();

}

}

catch (Exception ex)

{

throw ex;

}

finally

{

sqlConn.Close();

}

}

public static DataTable GetTableSchema()

{

DataTable dt = new DataTable();

dt.Columns.AddRange(new DataColumn[]{

new DataColumn("Id",typeof(int)),

new DataColumn("UserName",typeof(string)),

new DataColumn("Pwd",typeof(string))});

return dt;

}

static void Main(string[] args)

{

Stopwatch sw = new Stopwatch();

for (int multiply = 0; multiply < 10; multiply++)

{

DataTable dt = TableValued.GetTableSchema();

for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)

{

DataRow r = dt.NewRow();

r[0] = count;

r[1] = string.Format("User-{0}", count * multiply);

r[2] = string.Format("Pwd-{0}", count * multiply);

dt.Rows.Add(r);

}

sw.Start();

TableValued.TableValuedToDB(dt);

sw.Stop();

Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));

}

Console.ReadLine();

}

耗时图如下:



比Bulk还快5秒。

SQLBulkCopy,用于数据库之间大批量的数据传递。通常用于新,旧数据库之间数据的更新。即使表结构完全不同,也可以通过字段间的对应关系,顺利的将数据导过来。

首先,SQLBulkCopy需要2个连接。分别连接到不同的旧表所在的数据库,新表所在的数据库。

其次,我们要从旧数据库中,把导出的字段读取出来。用什么读呢?可以用Datatable,也可以用SqlDataReader。因为SqlDataReader不占用内存,对大批量的数据复制,不需要事先导入到系统。所以就用SqlDataReader了。

读出后,设定对应关系,设定目标表名,写入。就这么简单。速度非常快!

初始化Connection对象

SqlConnection ConnectionNew=new SqlConnection("连接信息");

SqlConnection ConnectionOld=new SqlConnection("连接信息");

try

{

//1.在旧表中,用SqlDataReader读取出信息

SqlCommand cmd = new SqlCommand(SQL, ConnectionOld);

sdr = cmd.ExecuteReader();

//2.初始化SqlBulkCopy对象,用新的连接作为参数。

SqlBulkCopy bulkCopy = new SqlBulkCopy(ConnectionNew);

//3.写对应关系。如旧表的People列的数据,对应新表Human列,那么就写bulkCopy.ColumnMappings.Add("People","Human")

//如果两张表的结构一样,那么对应关系就不用写了。

//我是用哈希表存储对应关系的,哈希表作为参数到传入方法中,key的值用来存储旧表的字段名,VALUE的值用来存储新表的值

foreach (string str in HTDuiYing.Keys)

{

bulkCopy.ColumnMappings.Add(str, HTDuiYing[str].ToString());

}

//4.设置目标表名

bulkCopy.DestinationTableName = TableNmae;

//额外,可不写:设置一次性处理的行数。这个行数处理完后,会激发SqlRowsCopied()方法。默认为1

bulkCopy.NotifyAfter = 10;

//额外,可不写:设置激发的SqlRowsCopied()方法,这里为bulkCopy_SqlRowsCopied

bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(bulkCopy_SqlRowsCopied);

//OK,开始传数据!

bulkCopy.WriteToServer(sdr);

}

//激发的方法写在外头

private void bulkCopy_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)

{

执行的内容。

这里有2个元素值得拿来用

e.RowsCopied,返回数值类型,表示当前已经复制的行数

e.Abort,用于赋值true or false,用于停止赋值的操作

}

由于不同批次在不同事务中执行,因此,如果在批量复制操作期间发生错误,则当前批次中的所有行都将被回滚,但以前批次中的行将保留在数据库中。

比如:批量复制100条数据到数据库汇总,batchsize设置为10.则没10条数据复制作为一个事务,整个100条数据的复制操作被分割为10个独立的事务。如果复制到第56条数据时(第6个事务),出错了。则前50条数据提交到数据库中,只回滚出错的事务。

如果由于发生错误而需要回滚整个批量复制操作,或者批量复制应作为更大的可回滚进程的一部分执行,则可以将 SQLTransaction 对象提供给 SqlBulkCopy 构造函数.

示例:

using (SqlConnection destinationConnection = new SqlConnection(connectionString))

{

destinationConnection.Open();

using (SqlTransaction transaction = destinationConnection.BeginTransaction())

{

using (SqlBulkCopy bulkCopy = new SqlBulkCopy( destinationConnection, SqlBulkCopyOptions.Default, transaction))

{

bulkCopy.BatchSize = 10;

bulkCopy.DestinationTableName = "dbo.BulkCopyDemoMatchingColumns";

try

{

bulkCopy.WriteToServer(reader);

transaction.Commit();

}

catch (Exception ex)

{

Console.WriteLine(ex.Message);

transaction.Rollback();

}

finally

{

reader.Close();

}

}

}

}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: