//取自动排序大小
private Int64 SeqInt(int size) { var jobs = db.GetCollection<BsonDocument>("Counters"); var query = Query.And(Query.EQ("_id", "TaoBaoItem")); var sortBy = SortBy.Descending("next"); var update = Update.Inc("next", (Int64)size);//.Push("new", true).Push("upsert", true); var result = jobs.FindAndModify(query, sortBy, update, true, true); BsonDocument chosenJob = result.ModifiedDocument; return chosenJob.GetValue("next").AsInt64; }
/// <summary> /// 批量查找存在的数据 /// </summary> /// <param name="cacheitems"></param> /// <returns></returns> private MongoCursor<TaoBaoItem> QueryItems(List<TaoBaoItem> cacheitems) { Stopwatch sw = new Stopwatch(); sw.Start(); BsonArray dd; dd = new BsonArray(); //批量插询 foreach (var item in cacheitems) { dd.Add(item.Iid); } var query = Query.In("Iid", dd); IMongoQuery q = query; // var count = taoBaoItemRepo.Collection.Count(q); var sort = new SortByDocument { { "_id", -1 } }; var x = taoBaoItemRepo.Collection.Find(q).SetSortOrder(sort); sw.Stop(); WorkMsg("自写查询{0}条耗时:{1} ".FormatWith(cacheitems.Count, sw.ElapsedMilliseconds)); return x; }
业务部份
/// <summary> /// 去重入库线程 /// </summary> public void Work() { while (true) { List<TaoBaoItem> cacheitems = ItemCache.GetLItem(); if (cacheitems != null) { Stopwatch sw = new Stopwatch(); sw.Start(); MongoCursor<TaoBaoItem> rz = QueryItems(cacheitems); foreach (TaoBaoItem doc in rz) { HashCache.Add(doc.Iid); } int i = 0; foreach (var item in cacheitems) { i++; if (HashCache.Add(item.Iid)) { Items.Add(item); //temp计数 并作大于100的判断 if (Items.Count >= 100 || (i == cacheitems.Count && ItemCache.Count() == 0)) { var seqid = SeqInt(Items.Count) ; seqid = seqid - Items.Count; foreach (var x in Items) { x.DocId = seqid; seqid++; } Stopwatch sw1 = new Stopwatch(); sw1.Start(); taoBaoItemRepo.Add(Items); sw1.Stop(); WorkMsg("插入{0}条宝贝耗时:{1} 库内共有宝贝{2} ".FormatWith(Items.Count, sw1.ElapsedMilliseconds, taoBaoItemRepo.Count())); Items.Clear(); } } } sw.Stop(); WorkMsg("{0}条记录处理[去重+入库]总耗时:{1} ".FormatWith(cacheitems.Count, sw.ElapsedMilliseconds)); } else { Thread.Sleep(5000); } } }