目录
一、Sql Server插入方案介绍
关于 SqlServer
批量插入的方式,有三种比较常用的插入方式,Insert
、BatchInsert
、SqlBulkCopy
,下面我们对比以下三种方案的速度
1.普通的Insert
插入方法
public static void Insert(IEnumerablepersons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); foreach (var person in persons) { using (var com = new SqlCommand( "INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)", con)) { com.Parameters.AddRange(new[] { new SqlParameter("@Id", SqlDbType.BigInt) {Value = person.Id}, new SqlParameter("@Name", SqlDbType.VarChar, 64) {Value = person.Name}, new SqlParameter("@Age", SqlDbType.Int) {Value = person.Age}, new SqlParameter("@CreateTime", SqlDbType.DateTime) {Value = person.CreateTime ?? (object) DBNull.Value}, new SqlParameter("@Sex", SqlDbType.Int) {Value = (int)person.Sex}, }); com.ExecuteNonQuery(); } } } }
2.拼接BatchInsert
插入语句
public static void BatchInsert(Person[] persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var pageCount = (persons.Length - 1) / 1000 + 1; for (int i = 0; i < pageCount; i++) { var personList = persons.Skip(i * 1000).Take(1000).ToArray(); var values = personList.Select(p => $"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})"); var insertSql = $"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(",", values)}"; using (var com = new SqlCommand(insertSql, con)) { com.ExecuteNonQuery(); } } } }
3.SqlBulkCopy
插入方案
public static void BulkCopy(IEnumerablepersons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var table = new DataTable(); table.Columns.AddRange(new [] { new DataColumn("Id", typeof(long)), new DataColumn("Name", typeof(string)), new DataColumn("Age", typeof(int)), new DataColumn("CreateTime", typeof(DateTime)), new DataColumn("Sex", typeof(int)), }); foreach (var p in persons) { table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex}); } using (var copy = new SqlBulkCopy(con)) { copy.DestinationTableName = "Person"; copy.WriteToServer(table); } } }
3.三种方案速度对比
方案 | 数量 | 时间 |
---|---|---|
Insert | 1千条 | 145.4351ms |
BatchInsert | 1千条 | 103.9061ms |
SqlBulkCopy | 1千条 | 7.021ms |
Insert | 1万条 | 1501.326ms |
BatchInsert | 1万条 | 850.6274ms |
SqlBulkCopy | 1万条 | 30.5129ms |
Insert | 10万条 | 13875.4934ms |
BatchInsert | 10万条 | 8278.9056ms |
SqlBulkCopy | 10万条 | 314.8402ms |
两者插入效率对比,Insert
明显比SqlBulkCopy
要慢太多,大概20~40倍性能差距,下面我们将SqlBulkCopy
封装一下,让批量插入更加方便
二、SqlBulkCopy封装代码
1.方法介绍
批量插入扩展方法签名
方法 | 方法参数 | 介绍 |
---|---|---|
BulkCopy | 同步的批量插入方法 | |
SqlConnection connection | sql server 连接对象 | |
IEnumerable |
需要批量插入的数据源 | |
string tableName = null | 插入表名称【为NULL默认为实体名称】 | |
int bulkCopyTimeout = 30 | 批量插入超时时间 | |
int batchSize = 0 | 写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】 | |
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量复制参数 | |
SqlTransaction externalTransaction = null | 执行的事务对象 | |
BulkCopyAsync | 异步的批量插入方法 | |
SqlConnection connection | sql server 连接对象 | |
IEnumerable |
需要批量插入的数据源 | |
string tableName = null | 插入表名称【为NULL默认为实体名称】 | |
int bulkCopyTimeout = 30 | 批量插入超时时间 | |
int batchSize = 0 | 写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】 | |
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量复制参数 | |
SqlTransaction externalTransaction = null | 执行的事务对象 |
这个方法主要解决了两个问题:
- 免去了手动构建
DataTable
或者IDataReader
接口实现类,手动构建的转换比较难以维护,如果修改字段就得把这些地方都进行修改,特别是还需要将枚举类型特殊处理,转换成他的基础类型(默认int
) - 不用亲自创建
SqlBulkCopy
对象,和配置数据库列的映射,和一些属性的配置
此方案也是在我公司中使用,以满足公司的批量插入数据的需求,例如第三方的对账数据此方法使用的是Expression
动态生成数据转换函数,其效率和手写的原生代码差不多,和原生手写代码相比,多余的转换损失很小【最大的性能损失都是在值类型
拆装箱上】
此方案和其他网上的方案有些不同的是:不是将List
先转换成DataTable
,然后写入SqlBulkCopy
的,而是使用一个实现IDataReader
的读取器包装List
,每往SqlBulkCopy
插入一行数据才会转换一行数据
IDataReader
方案和DataTable
方案相比优点
效率高:DataTable
方案需要先完全转换后,才能交由SqlBulkCopy
写入数据库,而IDataReader
方案可以边转换边交给SqlBulkCopy
写入数据库(例如:10万数据插入速度可提升30%)
占用内存少:DataTable
方案需要先完全转换后,才能交由SqlBulkCopy
写入数据库,需要占用大量内存,而IDataReader
方案可以边转换边交给SqlBulkCopy
写入数据库,无须占用过多内存
强大:因为是边写入边转换,而且EnumerableReader
传入的是一个迭代器,可以实现持续插入数据的效果
2.实现原理
① 实体Model与表映射
数据库表代码
CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
实体类代码
public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 }
- 创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】
- 创建映射使用的
SqlBulkCopy
类型的ColumnMappings
属性来完成,数据列与数据库中列的映射
//创建批量插入对象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { foreach (var column in ModelToDataTable.Columns) { //创建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } }
② 实体转换成数据行
将数据转换成数据行采用的是:反射
+Expression
来完成
其中反射
是用于获取编写Expression
所需程序类,属性等信息
其中Expression
是用于生成高效转换函数其中ModelToDataTable
类型利用了静态泛型类特性,实现泛型参数的缓存效果
在ModelToDataTable
的静态构造函数中,生成转换函数,获取需要转换的属性信息,并存入静态只读字段中,完成缓存
③ 使用IDataReader插入数据的重载
EnumerableReader
是实现了IDataReader
接口的读取类,用于将模型对象,在迭代器中读取出来,并转换成数据行,可供SqlBulkCopy
读取
SqlBulkCopy
只会调用三个方法:GetOrdinal
、Read
、GetValue
- 其中
GetOrdinal
只会在首行读取每个列所代表序号【需要填写:SqlBulkCopy
类型的ColumnMappings
属性】 - 其中
Read
方法是迭代到下一行,并调用ModelToDataTable
来将模型对象转换成数据行.ToRowData.Invoke() object[]
- 其中
GetValue
方法是获取当前行指定下标位置的值
3.完整代码
扩展方法类
public static class SqlConnectionExtension { ////// 批量复制 /// ///插入的模型对象 /// 需要批量插入的数据源 /// 数据库连接对象 /// 插入表名称【为NULL默认为实体名称】 /// 插入超时时间 /// 写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】 /// 批量复制参数 /// 执行的事务对象 ///插入数量 public static int BulkCopy(this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { //创建读取器 using (var reader = new EnumerableReader (source)) { //创建批量插入对象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { //插入的表 copy.DestinationTableName = tableName ?? typeof(TModel).Name; //写入数据库一批数量 copy.BatchSize = batchSize; //超时时间 copy.BulkCopyTimeout = bulkCopyTimeout; //创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】 foreach (var column in ModelToDataTable .Columns) { //创建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } //将数据批量写入数据库 copy.WriteToServer(reader); //返回插入数据数量 return reader.Depth; } } } /// /// 批量复制-异步 /// ///插入的模型对象 /// 需要批量插入的数据源 /// 数据库连接对象 /// 插入表名称【为NULL默认为实体名称】 /// 插入超时时间 /// 写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】 /// 批量复制参数 /// 执行的事务对象 ///插入数量 public static async TaskBulkCopyAsync (this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { //创建读取器 using (var reader = new EnumerableReader (source)) { //创建批量插入对象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { //插入的表 copy.DestinationTableName = tableName ?? typeof(TModel).Name; //写入数据库一批数量 copy.BatchSize = batchSize; //超时时间 copy.BulkCopyTimeout = bulkCopyTimeout; //创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】 foreach (var column in ModelToDataTable .Columns) { //创建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } //将数据批量写入数据库 await copy.WriteToServerAsync(reader); //返回插入数据数量 return reader.Depth; } } } }
封装的迭代器数据读取器
////// 迭代器数据读取器 /// ///模型类型 public class EnumerableReader: IDataReader { /// /// 实例化迭代器读取对象 /// /// 模型源 public EnumerableReader(IEnumerablesource) { _source = source ?? throw new ArgumentNullException(nameof(source)); _enumerable = source.GetEnumerator(); } private readonly IEnumerable _source; private readonly IEnumerator _enumerable; private object[] _currentDataRow = Array.Empty
模型对象转数据行工具类
////// 对象转换成DataTable转换类 /// ///泛型类型 public static class ModelToDataTable{ static ModelToDataTable() { //如果需要剔除某些列可以修改这段代码 var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray(); Columns = new ReadOnlyCollection (propertyList .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray()); //生成对象转数据行委托 ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList); } /// /// 构建转换成数据行委托 /// /// 传入类型 /// 转换的属性 ///转换数据行委托 private static FuncBuildToRowDataDelegation(Type type, PropertyInfo[] propertyList) { var source = Expression.Parameter(type); var items = propertyList.Select(property => ConvertBindPropertyToData(source, property)); var array = Expression.NewArrayInit(typeof(object), items); var lambda = Expression.Lambda >(array, source); return lambda.Compile(); } /// /// 将属性转换成数据 /// /// 源变量 /// 属性信息 ///获取属性数据表达式 private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property) { var propertyType = property.PropertyType; var expression = (Expression)Expression.Property(source, property); if (propertyType.IsEnum) expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType()); return Expression.Convert(expression, typeof(object)); } ////// 获取数据类型 /// /// 属性类型 ///数据类型 private static Type GetDataType(Type type) { //枚举默认转换成对应的值类型 if (type.IsEnum) return type.GetEnumUnderlyingType(); //可空类型 if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>)) return GetDataType(type.GetGenericArguments().First()); return type; } ////// 列集合 /// public static IReadOnlyListColumns { get; } /// /// 对象转数据行委托 /// public static FuncToRowData { get; } /// /// 集合转换成DataTable /// /// 集合 /// 表名称 ///转换完成的DataTable public static DataTable ToDataTable(IEnumerablesource, string tableName = "TempTable") { //创建表对象 var table = new DataTable(tableName); //设置列 foreach (var dataColumn in Columns) { table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType)); } //循环转换每一行数据 foreach (var item in source) { table.Rows.Add(ToRowData.Invoke(item)); } //返回表对象 return table; } }
三、测试封装代码
1.测试代码
创表代码
CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
实体类代码
定义的实体的属性名称需要和SqlServer
列名称类型对应
public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 }
测试方法
//生成10万条数据 var persons = new Person[100000]; var random = new Random(); for (int i = 0; i < persons.Length; i++) { persons[i] = new Person { Id = i + 1, Name = "张三" + i, Age = random.Next(1, 128), Sex = (Gender)random.Next(2), CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i) }; } //创建数据库连接 using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { conn.Open(); var sw = Stopwatch.StartNew(); //批量插入数据 var qty = conn.BulkCopy(persons); sw.Stop(); Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms"); }
执行批量插入结果
226.4767ms
请按任意键继续. . .
四、代码下载
GitHub代码地址:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
本文地址:/shujuku/MsSQL/97008.html