關于 SqlServer 批量插入的方式,前段時間也有大神給出了好幾種批量插入的方式及對比測試(http://www.rzrgm.cn/jiekzou/p/6145550.html),估計大家也都明白,最佳的方式就是用 SqlBulkCopy。自從LZ把Chloe.ORM開源以后,有不少園友/群友詢問,框架怎么批量插入數據。我的回答是不支持!最后建議他們用 SqlBulkCopy 的方式插入。在我們公司,我對 SqlBulkCopy 封裝成了一個 Helper 方法,使得批量插入更加方便,以滿足公司內部不少批量插入需求。我也在群里分享了給他們。因為已經有好幾位朋友咨詢過,所以,我感覺應該還有很多人還沒有自己的一個批量插入方法,因此,LZ今兒給大家分享下我封裝的這個批量插入方法,希望大家喜歡。
先看看封裝后的方法定義:
public static class SqlConnectionExtension
{
/// <summary>
/// 使用 SqlBulkCopy 向 destinationTableName 表插入數據
/// </summary>
/// <typeparam name="TModel">必須擁有與目標表所有字段對應屬性</typeparam>
/// <param name="conn"></param>
/// <param name="modelList">要插入的數據</param>
/// <param name="batchSize">SqlBulkCopy.BatchSize</param>
/// <param name="destinationTableName">如果為 null,則使用 TModel 名稱作為 destinationTableName</param>
/// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
/// <param name="externalTransaction">要使用的事務</param>
public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null);
}
上面都有詳細解釋,相信大家一看就會明白,接下來演示下用法及效果:
先創建一個測試的 Users 表:
1 CREATE TABLE [dbo].[Users]( 2 [Id] [uniqueidentifier] NOT NULL, 3 [Name] [nvarchar](100) NULL, 4 [Gender] [int] NULL, 5 [Age] [int] NULL, 6 [CityId] [int] NULL, 7 [OpTime] [datetime] NULL, 8 CONSTRAINT [PK_Users] PRIMARY KEY CLUSTERED 9 ( 10 [Id] ASC 11 )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] 12 ) ON [PRIMARY]
然后定義一個與表映射的 Model,記住,由于 SqlBulkCopy 的特性,定義的 Model 必須擁有與表所有的字段對應的屬性:
1 public enum Gender
2 {
3 Man = 1,
4 Woman
5 }
6
7 public class User
8 {
9 public Guid Id { get; set; }
10 public string Name { get; set; }
11 public Gender? Gender { get; set; }
12 public int? Age { get; set; }
13 public int? CityId { get; set; }
14 public DateTime? OpTime { get; set; }
15 }
制造些數據,然后就可以直接插入了:
1 List<User> usersToInsert = new List<User>();
2 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so1", Gender = Gender.Man, Age = 18, CityId = 1, OpTime = DateTime.Now });
3 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so2", Gender = Gender.Man, Age = 19, CityId = 2, OpTime = DateTime.Now });
4 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so3", Gender = Gender.Man, Age = 20, CityId = 3, OpTime = DateTime.Now });
5 usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so4", Gender = Gender.Man, Age = 21, CityId = 4, OpTime = DateTime.Now });
6
7 using (SqlConnection conn = new SqlConnection("Data Source = .;Initial Catalog = Chloe;Integrated Security = SSPI;"))
8 {
9 conn.BulkCopy(usersToInsert, 20000, "Users");
10 }
執行插入后表數據:
很方便吧,定義好 Model,調用 BulkCopy 方法就能插入了。這個方法主要解決了兩個問題:1.免去手動構造 DataTable 和向 DataTable 填充數據,要知道,SqlBulkCopy 要求 DataTable 的列必須和表列順序一致,如果手動構造 DataTable 的話會使代碼很難維護;2.不用親自 new 出 SqlBulkCopy 對象以及手動給 SqlBulkCopy 對象設置各種值,如 DestinationTableName、BulkCopyTimeout、BatchSize 等,用封裝的方法,直接傳相應的值就好了。接下來貼干貨,簡單介紹下實現。
先了解 SqlBulkCopy 的定義(部分):
public sealed class SqlBulkCopy : IDisposable
{
public SqlBulkCopy(SqlConnection connection);
public SqlBulkCopy(string connectionString);
public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions);
public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction);
public int BatchSize { get; set; }
public int BulkCopyTimeout { get; set; }
public SqlBulkCopyColumnMappingCollection ColumnMappings { get; }
public string DestinationTableName { get; set; }
public bool EnableStreaming { get; set; }
public int NotifyAfter { get; set; }
public event SqlRowsCopiedEventHandler SqlRowsCopied;
public void Close();
public void WriteToServer(DataRow[] rows);
public void WriteToServer(DataTable table);
public void WriteToServer(IDataReader reader);
public void WriteToServer(DataTable table, DataRowState rowState);
}
我們只需關注 WriteToServer 方法。因為我們的數據源不是數據庫或excel,所以我們直接不考慮 WriteToServer(IDataReader reader)。WriteToServer(DataRow[] rows) 直接無視,不多解釋,所以我們只需考慮用 WriteToServer(DataTable table) 就行了。開干!
一、構造一個結構嚴謹的 DataTable。
由于 SqlBulkCopy 要求 DataTable 的列必須和表列順序一致,并且不能多也不能少,所以,我們首先要創建一個和目標表字段順序一致的 DataTable,先查出目標表的結構:
static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
{
string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);
List<SysColumn> columns = new List<SysColumn>();
using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
{
conn.Open();
using (var reader = conn.ExecuteReader(sql))
{
while (reader.Read())
{
SysColumn column = new SysColumn();
column.Name = reader.GetDbValue("name");
column.ColOrder = reader.GetDbValue("colorder");
columns.Add(column);
}
}
conn.Close();
}
return columns;
}
得到基本的表結構 List<SysColumn>,再創建“嚴格”的 DataTable 對象:
DataTable dt = new DataTable();
Type modelType = typeof(TModel);
List<SysColumn> columns = GetTableColumns(conn, tableName);
List<PropertyInfo> mappingProps = new List<PropertyInfo>();
var props = modelType.GetProperties();
for (int i = 0; i < columns.Count; i++)
{
var column = columns[i];
PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
if (mappingProp == null)
throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名為 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name));
mappingProps.Add(mappingProp);
Type dataType = GetUnderlyingType(mappingProp.PropertyType);
if (dataType.IsEnum)
dataType = typeof(int);
dt.Columns.Add(new DataColumn(column.Name, dataType));
}
注意,構造 DataColumn 時,要給 Column 設置 DataType,及數據類型。因為如果不指定數據類型,默認是 string 類型,那樣會導致將數據發送至數據庫時會引起數據轉換,會有些許無謂的性能損耗,同時,如果不指定數據類型,導入一些數據類型時可能會失敗,比如模型屬性是 Guid 類型,導入時會出現類型轉換失敗異常。
二、利用反射,獲取屬性值,構造一行一行的 DataRow,填充 DataTable:
foreach (var model in modelList)
{
DataRow dr = dt.NewRow();
for (int i = 0; i < mappingProps.Count; i++)
{
PropertyInfo prop = mappingProps[i];
object value = prop.GetValue(model);
if (GetUnderlyingType(prop.PropertyType).IsEnum)
{
if (value != null)
value = (int)value;
}
dr[i] = value ?? DBNull.Value;
}
dt.Rows.Add(dr);
}
三、一個完整包含數據的 DataTable 對象就創建好了,我們就可以使用 SqlBulkCopy 插入數據了:
public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
{
bool shouldCloseConnection = false;
if (string.IsNullOrEmpty(destinationTableName))
destinationTableName = typeof(TModel).Name;
DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);
SqlBulkCopy sbc = null;
try
{
if (externalTransaction != null)
sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
else
sbc = new SqlBulkCopy(conn);
using (sbc)
{
sbc.BatchSize = batchSize;
sbc.DestinationTableName = destinationTableName;
if (bulkCopyTimeout != null)
sbc.BulkCopyTimeout = bulkCopyTimeout.Value;
if (conn.State != ConnectionState.Open)
{
shouldCloseConnection = true;
conn.Open();
}
sbc.WriteToServer(dtToWrite);
}
}
finally
{
if (shouldCloseConnection && conn.State == ConnectionState.Open)
conn.Close();
}
}
完事,一個批量插入的 Helper 方法就這么產生了,最終的完整實現如下:
public static class SqlConnectionExtension
{
/// <summary>
/// 使用 SqlBulkCopy 向 destinationTableName 表插入數據
/// </summary>
/// <typeparam name="TModel">必須擁有與目標表所有字段對應屬性</typeparam>
/// <param name="conn"></param>
/// <param name="modelList">要插入的數據</param>
/// <param name="batchSize">SqlBulkCopy.BatchSize</param>
/// <param name="destinationTableName">如果為 null,則使用 TModel 名稱作為 destinationTableName</param>
/// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
/// <param name="externalTransaction">要使用的事務</param>
public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
{
bool shouldCloseConnection = false;
if (string.IsNullOrEmpty(destinationTableName))
destinationTableName = typeof(TModel).Name;
DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);
SqlBulkCopy sbc = null;
try
{
if (externalTransaction != null)
sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
else
sbc = new SqlBulkCopy(conn);
using (sbc)
{
sbc.BatchSize = batchSize;
sbc.DestinationTableName = destinationTableName;
if (bulkCopyTimeout != null)
sbc.BulkCopyTimeout = bulkCopyTimeout.Value;
if (conn.State != ConnectionState.Open)
{
shouldCloseConnection = true;
conn.Open();
}
sbc.WriteToServer(dtToWrite);
}
}
finally
{
if (shouldCloseConnection && conn.State == ConnectionState.Open)
conn.Close();
}
}
public static DataTable ToSqlBulkCopyDataTable<TModel>(List<TModel> modelList, SqlConnection conn, string tableName)
{
DataTable dt = new DataTable();
Type modelType = typeof(TModel);
List<SysColumn> columns = GetTableColumns(conn, tableName);
List<PropertyInfo> mappingProps = new List<PropertyInfo>();
var props = modelType.GetProperties();
for (int i = 0; i < columns.Count; i++)
{
var column = columns[i];
PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
if (mappingProp == null)
throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名為 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name));
mappingProps.Add(mappingProp);
Type dataType = GetUnderlyingType(mappingProp.PropertyType);
if (dataType.IsEnum)
dataType = typeof(int);
dt.Columns.Add(new DataColumn(column.Name, dataType));
}
foreach (var model in modelList)
{
DataRow dr = dt.NewRow();
for (int i = 0; i < mappingProps.Count; i++)
{
PropertyInfo prop = mappingProps[i];
object value = prop.GetValue(model);
if (GetUnderlyingType(prop.PropertyType).IsEnum)
{
if (value != null)
value = (int)value;
}
dr[i] = value ?? DBNull.Value;
}
dt.Rows.Add(dr);
}
return dt;
}
static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
{
string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);
List<SysColumn> columns = new List<SysColumn>();
using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
{
conn.Open();
using (var reader = conn.ExecuteReader(sql))
{
while (reader.Read())
{
SysColumn column = new SysColumn();
column.Name = reader.GetDbValue("name");
column.ColOrder = reader.GetDbValue("colorder");
columns.Add(column);
}
}
conn.Close();
}
return columns;
}
static Type GetUnderlyingType(Type type)
{
Type unType = Nullable.GetUnderlyingType(type); ;
if (unType == null)
unType = type;
return unType;
}
class SysColumn
{
public string Name { get; set; }
public int ColOrder { get; set; }
}
}
1 public static class SqlConnectionExtension
2 {
3 /// <summary>
4 /// 使用 SqlBulkCopy 向 destinationTableName 表插入數據
5 /// </summary>
6 /// <typeparam name="TModel">必須擁有與目標表所有字段對應屬性</typeparam>
7 /// <param name="conn"></param>
8 /// <param name="modelList">要插入的數據</param>
9 /// <param name="batchSize">SqlBulkCopy.BatchSize</param>
10 /// <param name="destinationTableName">如果為 null,則使用 TModel 名稱作為 destinationTableName</param>
11 /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
12 /// <param name="externalTransaction">要使用的事務</param>
13 public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
14 {
15 bool shouldCloseConnection = false;
16
17 if (string.IsNullOrEmpty(destinationTableName))
18 destinationTableName = typeof(TModel).Name;
19
20 DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);
21
22 SqlBulkCopy sbc = null;
23
24 try
25 {
26 if (externalTransaction != null)
27 sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
28 else
29 sbc = new SqlBulkCopy(conn);
30
31 using (sbc)
32 {
33 sbc.BatchSize = batchSize;
34 sbc.DestinationTableName = destinationTableName;
35
36 if (bulkCopyTimeout != null)
37 sbc.BulkCopyTimeout = bulkCopyTimeout.Value;
38
39 if (conn.State != ConnectionState.Open)
40 {
41 shouldCloseConnection = true;
42 conn.Open();
43 }
44
45 sbc.WriteToServer(dtToWrite);
46 }
47 }
48 finally
49 {
50 if (shouldCloseConnection && conn.State == ConnectionState.Open)
51 conn.Close();
52 }
53 }
54
55 public static DataTable ToSqlBulkCopyDataTable<TModel>(List<TModel> modelList, SqlConnection conn, string tableName)
56 {
57 DataTable dt = new DataTable();
58
59 Type modelType = typeof(TModel);
60
61 List<SysColumn> columns = GetTableColumns(conn, tableName);
62 List<PropertyInfo> mappingProps = new List<PropertyInfo>();
63
64 var props = modelType.GetProperties();
65 for (int i = 0; i < columns.Count; i++)
66 {
67 var column = columns[i];
68 PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
69 if (mappingProp == null)
70 throw new Exception(string.Format("model 類型 '{0}'未定義與表 '{1}' 列名為 '{2}' 映射的屬性", modelType.FullName, tableName, column.Name));
71
72 mappingProps.Add(mappingProp);
73 Type dataType = GetUnderlyingType(mappingProp.PropertyType);
74 if (dataType.IsEnum)
75 dataType = typeof(int);
76 dt.Columns.Add(new DataColumn(column.Name, dataType));
77 }
78
79 foreach (var model in modelList)
80 {
81 DataRow dr = dt.NewRow();
82 for (int i = 0; i < mappingProps.Count; i++)
83 {
84 PropertyInfo prop = mappingProps[i];
85 object value = prop.GetValue(model);
86
87 if (GetUnderlyingType(prop.PropertyType).IsEnum)
88 {
89 if (value != null)
90 value = (int)value;
91 }
92
93 dr[i] = value ?? DBNull.Value;
94 }
95
96 dt.Rows.Add(dr);
97 }
98
99 return dt;
100 }
101 static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
102 {
103 string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);
104
105 List<SysColumn> columns = new List<SysColumn>();
106 using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
107 {
108 conn.Open();
109 using (var reader = conn.ExecuteReader(sql))
110 {
111 while (reader.Read())
112 {
113 SysColumn column = new SysColumn();
114 column.Name = reader.GetDbValue("name");
115 column.ColOrder = reader.GetDbValue("colorder");
116
117 columns.Add(column);
118 }
119 }
120 conn.Close();
121 }
122
123 return columns;
124 }
125
126 static Type GetUnderlyingType(Type type)
127 {
128 Type unType = Nullable.GetUnderlyingType(type); ;
129 if (unType == null)
130 unType = type;
131
132 return unType;
133 }
134
135 class SysColumn
136 {
137 public string Name { get; set; }
138 public int ColOrder { get; set; }
139 }
140 }
141
代碼不多,僅僅150行,大家可以直接拷走拿去用。其中用了反射,估計吃瓜群眾可能不淡定了~哈哈,如果你真有大數據插入需求,這點反射消耗相對大數據插入簡直九牛一毛,微乎其微,放心好了。
最后,感謝大家閱讀至此。如果本文對您有用,還望給個愛心推薦,您的贊賞是我持續分享的動力。也歡迎廣大C#同胞入群交流(群號在頂部),暢談.NET復興大計。


浙公網安備 33010602011771號