上一篇我们说了如何向elasticsearch中创建索引和插入数据(https://www.cnblogs.com/zpy1993-09/p/13380197.html)
今天在做一个延伸,上次说的是单行插入数据,如果数据量小的话还可以,如果大的话,那单行 插入就不能满足需求了,必须要批量插入。如果做批量插入,一般都是做个缓存,到一定时间,或者一定条数然后批量插入一次,如果按一定时间那就是定时缓存,比如数据量很大是,10分钟把缓存的数据批量插入。如果按条数那就是如果缓存1000条插入一次。具体情况要根据数据需求及时性了。
今天我们就说一下如何按一定条数批量插入。那么缓存应该怎么做呢,方式很多。你可以见一个消息队列,或者栈,字典,数组都行。
为了方便理解,我今天就用大家最常见的List数组来做一个缓存。
由于我们是通过API接口往elasticsearch中批量送数据的,所以要做一个List数组的缓存,首先要考虑List的实例不能被覆盖,也就是说每次调用一次接口插入数据,List数组的实例都要是同一个。不然每调用一个接口都要new 一个新的List对象,那还怎么做缓存 。所以我们首先要保证List实例一直唯一。所以我们都会想到单利,那就是单例模式。
由于接口一般都是通过json接收的,我们要解析json最好做一个模型的映射。
假设通过接口插入elasticsearch中的数据是学生信息。首先我们建一个简单的学生类。
public class Student { public string name{get;set;}//姓名 public int number{get;set;}//学号 public int age{get;set;}//年龄 }
那我们就要定义一个Student的List数组的单例:
public class ListExample<Student> { private volatile static List<Student> instance = null; private static readonly object _lock = new object(); public static List<Student> GetInstance() { if (instance == null) { lock (_lock) { if (instance == null) { instance = new List<Student>(); } } } return instance; } }
这样是不是就能保证List对象实例的唯一了,但是有些人会问了,这也太麻烦了,如果多了怎么办,我总不能每一个接口我都要做一个单例吧,那岂不是太苦逼了。
的确是这样,所以,我们要做一下优化,把这个单例方法做成通用模式,支持任何对象的实例。所以,我们自然而然的就想到泛型了。
public class ListExample<T> { private volatile static List<T> instance = null; private static readonly object _lock = new object(); public static List<T> GetInstance() { if (instance == null) { lock (_lock) { if (instance == null) { instance = new List<T>(); } } } return instance; } }
这样不就通用了!
public static class ESHelper { public static readonly string url = "http://IP/"; /// <summary> /// 批量插入 /// </summary> /// <param name="obj">出入的数据</param> /// <param name="index">索引</param> public static void ManyInsert(List<Student> obj, string index) { //设置连接字符串,DefaultIndex中的表名要小写 var settings = new ConnectionSettings(new Uri(url)).DefaultIndex(index); var client = new ElasticClient(settings); var ndexResponse = client.IndexMany<Student>(obj); } }
还是和单例一样的问题,我们不可能不通用,所以我们要做的通用,还是泛型:
public static class ESHelper<T> where T : class { public static readonly string url = "http://IP/";//elasticsearch的IP /// <summary> /// 批量插入 /// </summary> /// <param name="obj">出入的数据</param> /// <param name="index">索引</param> public static void ManyInsert(List<T> obj, string index) { //设置连接字符串,DefaultIndex中的表名要小写 var settings = new ConnectionSettings(new Uri(url)).DefaultIndex(index); var client = new ElasticClient(settings); var ndexResponse = client.IndexMany<T>(obj); } }
那我们可以通过向ES送入学生信息演示一下:
/// <summary> /// 向ES中送入数据 /// </summary> /// <param name="Data">json对象</param> /// <returns></returns> [HttpPost("GetLogList")] public string GetLogList([FromBody] object Data) { //json 数组格式:“{“name”:"张三”,"number",123456,"age":20}” JObject LogList = (JObject)JsonConvert.DeserializeObject(Data.ToString());
Student student=new Student{name=LogList["name"],number=LogList["number"],age=LogList["age"]}
ListExample<Student>.GetInstance().Add(student);
if(ListExample<Student>.GetInstance().Count==1000)
{
ESHelper<Student>.ManyInsert(ListExample<Student>.GetInstance(), "Student-" + DateTime.Now.ToString("yyyy-MM"));//批量插入ES中
ListExample<Student>.GetInstance().Clear();//清空缓存
}
}
这样批量向ES数据完毕,是不是很简单。但是通过一定条数批量插入式存在弊端的,比如2000条批量插入一次,如果另一边调用接口,上传了1000条数据,突然死掉的话,那么这缓存的1000条数据就送不到ES中了,所以最好用定时往ES中批量送数据,下一章我们说一下如何定时向ES中批量插入数据。