项目中用到了Hbase,生产环境服务器用了3台,但是不够稳定,每2天左右,就连不上了。
重启就好了,当然,这是一个历史遗留问题。
我在想,是不是连接没有关闭,每次都是建立新的连接?
瞅瞅Java访问Hbase的代码,都close了额。
原来的Hbase,用Java访问,有add/update、get、getList3个接口。
现在要加上Mongodb存储,尽可能保证Hbase和Mongodb数据同步。
优先使用Mongodb中的数据,其次才使用HBase中的数据。
今后有可能不再使用Hbase。
在项目刚刚启动的时候,需要同步HBase中的数据到Mongodb。
简化代码如下:
public class ProjectDetailClient {
private ProjectDetailHbaseClient hbase = new ProjectDetailHbaseClient();
private ProjectDetailMongodbClient mongodb = new ProjectDetailMongodbClient();
// 2个都增加
public void add(ProjectDetail projectDetail) {
}
}
可以这么理解,原来直接使用ProjectDetailHbaseClient,方法名称都一样。
后台增加了ProjectDetailMongodbClient,方法的实现也一样,可以看作是一套接口的2套实现。
ProjectDetailClient的add等具体方法中,会处理2个接口的调用、数据同步等逻辑问题。
完整代码如下:
package com.hanhai.zrb.api.mongodb; import java.util.List; import org.apache.log4j.Logger; import casia.isiteam.zrb.hbase.client.ProjectDetailHbaseClient; import com.hanhai.zrb.model.project.ProjectDetail; public class ProjectDetailClient { private ProjectDetailHbaseClient hbase = new ProjectDetailHbaseClient(); private ProjectDetailMongodbClient mongodb = new ProjectDetailMongodbClient(); private Logger log = Logger.getLogger(getClass()); // 2个都增加 public void add(ProjectDetail projectDetail) { log.info("Add ProjectDetail for hbase."); hbase.insertProjectDetail(projectDetail); log.info("Add ProjectDetail for mongodb."); mongodb.add(projectDetail); } // 2个都更新 public void update(ProjectDetail projectDetail) { if (projectDetail.getId() == null) { log.error("ProjectDetail is is null,Cantnot update~"); return; } Long id = projectDetail.getId(); ProjectDetail one = mongodb.get(id); // Mongodb,如果存在,更新 if (one != null) { log.info("Update ProjectDetail,Mongodb exists,id="+id); mongodb.update(projectDetail); } // 不存在,就增加 else { log.info("Update ProjectDetail,Mongodb not exists,id="+id); mongodb.add(projectDetail); } // hbase增加和更新是同一个接口 log.info("Update ProjectDetail for hbase,id="+id); hbase.insertProjectDetail(projectDetail); } // 2个都查询,优先使用Mongodb public ProjectDetail get(long id) { ProjectDetail one = null; ProjectDetail hbaseOne = hbase.getProjectDetail(id); ProjectDetail mongodbOne = mongodb.get(id); if (mongodbOne != null) { one = mongodbOne; log.info("Project Detail,Mongodb exists,Use Mongodb," + one); } else if (hbaseOne != null) { one = hbaseOne; log.info("Project Detail,Mongodb not exists,Use Hbase," + one); log.info("Add Project Detail To Mongodb"); // 同步Hbase中的数据到Mongodb mongodb.add(hbaseOne); } return one; } // 2个都查询,优先使用Mongodb public List<ProjectDetail> getProjectInfoBasic(List<Long> idList) { List<ProjectDetail> list = null; List<ProjectDetail> hbaseList = hbase.getProjectInfoBasic(idList); List<ProjectDetail> mongodbList = mongodb.getProjectInfoBasic(idList); // 优先使用Mongodb中的,条件,Mongodb中的个数不小于hbase中的 if (mongodbList != null) { int size = mongodbList.size(); if (hbaseList == null || hbaseList.size() <= size) { list = mongodbList; log.info("ProjectDetail list,Use MongodbList,size=" + size); }else{ list = hbaseList; log.info("ProjectDetail list,Use HbaseList,size=" + hbaseList.size()+",mongodb count "+size+" < hbase count "+hbaseList.size()); } } // 其次使用Hbase中的,不会同步hbase中的数据到Mongodb else if (hbaseList != null) { list = hbaseList; log.info("ProjectDetail list,Use HbaseList,size=" + hbaseList.size()); } return list; } }
package com.hanhai.zrb.api.mongodb; import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; import com.hanhai.zrb.model.project.ProjectDetail; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.WriteResult; public class ProjectDetailMongodbClient { public static final String CON_NAME = "projectDetail"; private Logger log = Logger.getLogger(getClass()); public void add(ProjectDetail projectDetail) { DBCollection con = getCon(); add(con, projectDetail); } private DBCollection getCon() { DB db = MongoUtil.db(); DBCollection con = db.getCollection(CON_NAME); return con; } // 增加 private DBCollection add(DBCollection projectDetailCollection, ProjectDetail projectDetail) { DBObject object = BeanUtil.bean2DBObject(projectDetail); WriteResult wr = projectDetailCollection.insert(object); CommandResult cr = wr.getLastError(); log.info("Add new projectDetail,result:" + cr); return projectDetailCollection; } public void update(ProjectDetail projectDetail) { DBCollection con = getCon(); update(con, projectDetail); } // 修改 private void update(DBCollection collection, ProjectDetail projectDetail) { if (projectDetail.getId() == null) { log.error("Update projectDetail,must have a unique id"); return; } BasicDBObject updateCondition = new BasicDBObject(); updateCondition.append("id", projectDetail.getId()); DBObject newObject = BeanUtil.bean2DBObject(projectDetail); DBObject updateSetValue = new BasicDBObject("$set", newObject); WriteResult wr = collection.update(updateCondition, updateSetValue); log.info("Update new projectDetail,result:" + wr); } public ProjectDetail get(long id) { DBCollection con = getCon(); ProjectDetail projectDetail = findById(con, id); return projectDetail; } // 从集合中,根据ID查找 private ProjectDetail findById(DBCollection collection, Long id) { BasicDBObject searchProjectDetailById = new BasicDBObject(); searchProjectDetailById.append("id", id); ProjectDetail projectDetailBefore = null; // findOne方法更简单一些 DBCursor cursor = collection.find(searchProjectDetailById); while (cursor.hasNext()) { DBObject articleObject = cursor.next(); if (articleObject != null) { projectDetailBefore = objectToArticle(articleObject); String internalId = articleObject.get("_id").toString(); projectDetailBefore.setMongoId(internalId); } } cursor.close(); return projectDetailBefore; } // 对象转换 private ProjectDetail objectToArticle(DBObject object) { ProjectDetail projectDetail = new ProjectDetail(); // 用工具方法转换,手动转换,需要判断类型,比较麻烦 projectDetail = BeanUtil.dbObject2Bean(object, projectDetail); return projectDetail; } public List<ProjectDetail> getProjectInfoBasic(List<Long> idList) { DBCollection con = getCon(); List<ProjectDetail> list = findByIdList(con, idList); return list; } // 根据ID集合查找 private List<ProjectDetail> findByIdList(DBCollection collection, List<Long> idList) { BasicDBList values = new BasicDBList(); values.addAll(idList); DBObject inQuery = new BasicDBObject("$in", values); DBObject con = new BasicDBObject(); con.put("id", inQuery); DBCursor cursorIdArray = collection.find(con); List<ProjectDetail> projectDetailList = new ArrayList<ProjectDetail>(); while (cursorIdArray.hasNext()) { DBObject articleObject = cursorIdArray.next(); ProjectDetail projectDetail = new ProjectDetail(); BeanUtil.dbObject2Bean(articleObject, projectDetail); String mongoId = articleObject.get("_id").toString(); projectDetail.setMongoId(mongoId); projectDetailList.add(projectDetail); } return projectDetailList; } }
ProjectDetailHbaseClient代码较为复杂,和ProjectDetailMongodbClient类似,不再贴了。