using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow;//引用命名空间 namespace TPL数据流处理数据 { class Program { /*建表脚本 CREATE TABLE [dbo].[Persons]( [ID] [int] NULL, [Name] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL, [Age] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL, [Sex] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL, [Addres] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL, [Height] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL, [Weight] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL ) */ //https://msdn.microsoft.com/zh-cn/vstudio/hh228603(v=vs.96)#predefined_types static readonly string connectionString = "Data Source=.;Initial Catalog=TplDB;Persist Security Info=True;User ID=sa;Password=xxxxx"; public class Person { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public string Sex { get; set; } public string Addres { get; set; } public string Height { get; set; } public string Weight { get; set; } static int RandomSeed() { byte[] bytes = new byte[4]; System.Security.Cryptography.RNGCryptoServiceProvider rng = new System.Security.Cryptography.RNGCryptoServiceProvider(); rng.GetBytes(bytes); return BitConverter.ToInt32(bytes, 0); } //待插入生成对象 public static ITargetBlock<Person> GetPerson(ITargetBlock<Person> list, int count) { for (int i = 0; i < count; i++) { //生成不重复随机信息 Random ran = new Random(RandomSeed()); list.Post(new Person { ID = i, Name = "Jhon" + i, Age = ran.Next(18, 70), Sex = i % 2 == 0 ? "男" : "女", Addres = "滨江滨河路" + i, Height = ran.Next(155, 210) + "cm", Weight = ran.Next(40, 200) + "kg" }); } return list; } //使用缓冲区 public static void ADDPersons(int count) { //创建一个ActionBlock添加多个对象,并设置批处理大小 var batchs = new BatchBlock<Person>(1024 * 1024 * 1024); var insertEmployees = new ActionBlock<Person[]>(a => InsertPersons(a)); //链接批块动作块 batchs.LinkTo(insertEmployees); // 当批块完成时,设置动作块也完成。 batchs.Completion.ContinueWith(delegate { insertEmployees.Complete(); }); //对象批处理块。 GetPerson(batchs,count); //设置批块完成状态,等待,插入操作完成。 batchs.Complete(); insertEmployees.Completion.Wait(); } //不使用缓冲区 public static void AddPersons(string connectionString, int count) { var insertEmployee = new ActionBlock<Person>(e => InsertPersons(new Person[] { e })); GetPerson(insertEmployee, count); //设置状态 insertEmployee.Complete(); insertEmployee.Completion.Wait(); } //插入信息 public static void InsertPersons(Person[] list) { SqlConnection sqlConn = new SqlConnection(connectionString);//连接数据库 try { SqlCommand sqlComm = new SqlCommand(); sqlComm.CommandText = @"INSERT INTO [TplDB].[dbo].[Persons]([ID] ,[Name],[Age] ,[Sex] ,[Addres],[Height],[Weight]) VALUES(@ID,@Name,@Age ,@Sex ,@Addres ,@Height ,@Weight)";//参数化SQL sqlComm.Parameters.Add("@ID", SqlDbType.Int); sqlComm.Parameters.Add("@Name", SqlDbType.VarChar); sqlComm.Parameters.Add("@Age", SqlDbType.VarChar); sqlComm.Parameters.Add("@Sex", SqlDbType.VarChar); sqlComm.Parameters.Add("@Addres", SqlDbType.VarChar); sqlComm.Parameters.Add("@Height", SqlDbType.VarChar); sqlComm.Parameters.Add("@Weight", SqlDbType.VarChar); sqlComm.CommandType = CommandType.Text; sqlComm.Connection = sqlConn; sqlConn.Open(); for (int i = 0; i < list.Length; i++) { sqlComm.Parameters.Clear(); sqlComm.Parameters.Add("@ID", list[i].ID); sqlComm.Parameters.Add("@Name", list[i].Name); sqlComm.Parameters.Add("@Age", list[i].Age); sqlComm.Parameters.Add("@Sex", list[i].Sex); sqlComm.Parameters.Add("@Addres", list[i].Addres); sqlComm.Parameters.Add("@Height", list[i].Height); sqlComm.Parameters.Add("@Weight", list[i].Weight); int m = sqlComm.ExecuteNonQuery(); } } catch (Exception ex) { throw ex; } finally { sqlConn.Close(); } } } static void Main(string[] args) { Person.ADDPersons(10000); Console.ReadKey(); } } }