这里验证第三个方法,原理是将需要装载的数据分载在所有的存储节点上,不同的地方是利用了存储节点提供的InvocationService进行装载,而不是PreloadRequest,
原理如图
前提条件是:
- 需要知道所有要装载的key值
- 需要根据存储节点的数目把key值进行分摊,这里是通过
- Map<Member, List<String>> divideWork(Set members)这个方法,输入Coherence的存储节点成员,输出一个map结构,以member为key,所有的entry key值为value.
- 装载数据的任务,主要是通过驱动MyLoadInvocable的run方法,把数据在各个节点中进行装载,MyLoadInvocable必须扩展AbstractInvocable并实现PortableObject,不知何解,我尝试实现Seriable方法,结果出错
- 在拆解所有key值的任务过程中,发现list<String>数组被后面的值覆盖,后来每次放入map的时候新建一个List才避免此现象发生.
- 不需要实现CacheLoader或者CacheStore方法
Person.java
package dataload;
import java.io.Serializable;
public class Person implements Serializable { private String Id; private String Firstname;
public void setId(String Id) { this.Id = Id; }
public String getId() { return Id; }
public void setFirstname(String Firstname) { this.Firstname = Firstname; }
public String getFirstname() { return Firstname; }
public void setLastname(String Lastname) { this.Lastname = Lastname; }
public String getLastname() { return Lastname; }
public void setAddress(String Address) { this.Address = Address; }
public String getAddress() { return Address; } private String Lastname; private String Address; public Person() { super(); } public Person(String sId,String sFirstname,String sLastname,String sAddress) { Id=sId; Firstname=sFirstname; Lastname=sLastname; Address=sAddress; } }
|
MyLoadInvocable.java
装载数据的任务,主要是通过驱动这个任务的run方法,把数据在各个节点中进行装载
package dataload;
import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; import com.tangosol.net.AbstractInvocable; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache;
import java.io.IOException; import java.io.Serializable;
import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException;
import java.util.Hashtable; import java.util.Iterator; import java.util.List;
import javax.naming.Context; import javax.naming.InitialContext;
import serp.bytecode.NameCache;
public class MyLoadInvocable extends AbstractInvocable implements PortableObject { private List<String> m_memberKeys; private String m_cache; public MyLoadInvocable() { super(); }
public MyLoadInvocable(List<String> memberKeys, String cache) { m_memberKeys = memberKeys; m_cache = cache; }
public Connection getConnection() { Connection m_con = null; try { Context ctx = null; Hashtable<String,String> ht = new Hashtable<String,String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL,"t3://localhost:7001"); ctx = new InitialContext(ht); javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds"); m_con = ds.getConnection(); } catch (Exception e) { System.out.println(e.getMessage()); } return m_con; } public void run() { System.out.println("Enter MyLoadInvocable run...."); NamedCache cache = CacheFactory.getCache(m_cache); Person person = null; Connection con = getConnection(); String sSQL = "SELECT id, firstname,lastname,address FROM persons WHERE id = ?"; System.out.println("Enter load= "+sSQL); try { PreparedStatement stmt = con.prepareStatement(sSQL);
for(int i = 0; i < m_memberKeys.size(); i++) { String id = (String)m_memberKeys.get(i); //System.out.println(list.get(i)); System.out.println("==========="+id); stmt.setString(1, id); ResultSet rslt = stmt.executeQuery(); if (rslt.next()) { person = new Person(rslt.getString("id"),rslt.getString("firstname"),rslt.getString("lastname"),rslt.getString("address")); cache.put(person.getId(),person); } // stmt.close(); } stmt.close(); }catch (Exception e) { System.out.println("==="+e.getMessage()); } } public void readExternal(PofReader in) throws IOException { m_memberKeys = (List<String>) in.readObject(0); m_cache = (String) in.readObject(1); }
/** * {@inheritDoc} */ public void writeExternal(PofWriter out) throws IOException { out.writeObject(0, m_memberKeys); out.writeObject(1, m_cache); }
}
|
LoadUsingEP.java
装载的客户端,负责数据分段,InvocationService查找以及驱动。
package dataload;
import com.tangosol.net.CacheFactory; import com.tangosol.net.InvocationService; import com.tangosol.net.Member; import com.tangosol.net.NamedCache; import com.tangosol.net.PartitionedService; import com.tangosol.util.InvocableMap;
import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set;
import javax.naming.Context; import javax.naming.InitialContext;
public class LoaderUsingEP { private Connection m_con; public Connection getConnection() { try { Context ctx = null; Hashtable<String,String> ht = new Hashtable<String,String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL,"t3://localhost:7001"); ctx = new InitialContext(ht); javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds"); m_con = ds.getConnection(); } catch (Exception e) { System.out.println(e.getMessage()); } return m_con; }
protected Set getStorageMembers(NamedCache cache) { return ((PartitionedService) cache.getCacheService()) .getOwnershipEnabledMembers(); } protected Map<Member, List<String>> divideWork(Set members) { Iterator i = members.iterator(); Map<Member, List<String>> mapWork = new HashMap(members.size()); try { String sql = "select count(*) from persons"; int totalcount = 0; int membercount = members.size(); Connection con = getConnection(); Statement st = con.createStatement(); ResultSet rs = st.executeQuery(sql); while (rs.next()) totalcount = Integer.parseInt(rs.getString(1)); int onecount = totalcount / membercount; int lastcount = totalcount % membercount; sql = "select id from persons"; ResultSet rs1 = st.executeQuery(sql); int count = 0; int currentworker=0; ArrayList<String> list=new ArrayList<String>(); while (rs1.next()) { if (count < onecount) { list.add(rs1.getString("id")); count++; } else { Member member = (Member) i.next(); ArrayList<String> list2=new ArrayList<String>(); list2.addAll(list); mapWork.put(member, list2); list.clear(); /* print the list value for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){ System.out.println("first="+entry.getKey()); List<String> memberKeys = entry.getValue(); for(int j = 0; j < memberKeys.size(); j++) { System.out.println("firsttime="+memberKeys.get(j)); //System.out.println(list.get(i)); } } */ count=0; list.add(rs1.getString("id")); count++; currentworker ++; if (currentworker == membercount-1) { onecount = onecount+lastcount; }
} } Member member = (Member) i.next(); mapWork.put(member, list); st.close(); con.close(); } catch (Exception e) { System.out.println("Exception...."+e.getMessage()); } for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){ System.out.println("final="+entry.getKey()); List<String> memberKeys = entry.getValue(); for(int j = 0; j < memberKeys.size(); j++) { System.out.println(memberKeys.get(j)); } } return mapWork; } public void load() { NamedCache cache = CacheFactory.getCache("SampleCache");
Set members = getStorageMembers(cache); System.out.println("members"+members.size());
Map<Member, List<String>> mapWork = divideWork(members);
InvocationService service = (InvocationService) CacheFactory.getService("LocalInvocationService"); for (Map.Entry<Member, List<String>> entry : mapWork.entrySet()) { Member member = entry.getKey(); List<String> memberKeys = entry.getValue(); System.out.println(memberKeys.size()); MyLoadInvocable task = new MyLoadInvocable(memberKeys, cache.getCacheName()); service.execute(task, Collections.singleton(member), null); } } public static void main(String[] args) { LoaderUsingEP ep = new LoaderUsingEP(); ep.load(); } }
|
需要配置的客户端schema
storage-override-client.xml
<?xml version="1.0"?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"> <cache-config> <caching-scheme-mapping> <!-- Caches with names that start with 'DBBacked' will be created as distributed-db-backed. --> <cache-mapping> <cache-name>SampleCache</cache-name> <scheme-name>distributed-pof</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <!-- DB Backed Distributed caching scheme. --> <distributed-scheme> <scheme-name>distributed-pof</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme>
<internal-cache-scheme> <local-scheme/> </internal-cache-scheme>
<cachestore-scheme> <class-scheme> <class-name>dataload.DBCacheStore</class-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>persons</param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> </read-write-backing-map-scheme> </backing-map-scheme>
<listener/> <autostart>true</autostart> <local-storage>false</local-storage> </distributed-scheme> <invocation-scheme> <scheme-name>my-invocation</scheme-name> <service-name>LocalInvocationService</service-name> <thread-count>5</thread-count> <autostart>true</autostart> </invocation-scheme>
</caching-schemes> </cache-config>
|
存储节点的Schema
<?xml version="1.0"?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"> <cache-config> <caching-scheme-mapping> <!-- Caches with names that start with 'DBBacked' will be created as distributed-db-backed. --> <cache-mapping> <cache-name>SampleCache</cache-name> <scheme-name>distributed-pof</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <!-- DB Backed Distributed caching scheme. --> <distributed-scheme> <scheme-name>distributed-pof</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme>
<internal-cache-scheme> <local-scheme/> </internal-cache-scheme>
<cachestore-scheme> <class-scheme> <class-name>dataload.DBCacheStore</class-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>persons</param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> </read-write-backing-map-scheme> </backing-map-scheme>
<listener/> <autostart>true</autostart> <local-storage>true</local-storage> </distributed-scheme> <invocation-scheme> <scheme-name>my-invocation</scheme-name> <service-name>LocalInvocationService</service-name> <thread-count>5</thread-count> <autostart>true</autostart> </invocation-scheme>
</caching-schemes> </cache-config>
|
输出结果
可见数据分片装载.