zoukankan      html  css  js  c++  java
  • Coherence装载数据的研究

    这里验证第三个方法,原理是将需要装载的数据分载在所有的存储节点上,不同的地方是利用了存储节点提供的InvocationService进行装载,而不是PreloadRequest,

    原理如图

    前提条件是:

    • 需要知道所有要装载的key值
    • 需要根据存储节点的数目把key值进行分摊,这里是通过
    • Map<Member, List<String>> divideWork(Set members)这个方法,输入Coherence的存储节点成员,输出一个map结构,以member为key,所有的entry key值为value.
    • 装载数据的任务,主要是通过驱动MyLoadInvocable的run方法,把数据在各个节点中进行装载,MyLoadInvocable必须扩展AbstractInvocable并实现PortableObject,不知何解,我尝试实现Seriable方法,结果出错
    • 在拆解所有key值的任务过程中,发现list<String>数组被后面的值覆盖,后来每次放入map的时候新建一个List才避免此现象发生.
    • 不需要实现CacheLoader或者CacheStore方法

    Person.java

    package dataload;

    import java.io.Serializable;

    public class Person implements Serializable {
    private String Id;
    private String Firstname;

    public void setId(String Id) {
    this.Id = Id;
    }

    public String getId() {
    return Id;
    }

    public void setFirstname(String Firstname) {
    this.Firstname = Firstname;
    }

    public String getFirstname() {
    return Firstname;
    }

    public void setLastname(String Lastname) {
    this.Lastname = Lastname;
    }

    public String getLastname() {
    return Lastname;
    }

    public void setAddress(String Address) {
    this.Address = Address;
    }

    public String getAddress() {
    return Address;
    }
    private String Lastname;
    private String Address;

    public Person() {
    super();
    }

    public Person(String sId,String sFirstname,String sLastname,String sAddress) {
    Id=sId;
    Firstname=sFirstname;
    Lastname=sLastname;
    Address=sAddress;
    }
    }

    MyLoadInvocable.java

    装载数据的任务,主要是通过驱动这个任务的run方法,把数据在各个节点中进行装载

    package dataload;

    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.net.AbstractInvocable;
    import com.tangosol.net.CacheFactory;
    import com.tangosol.net.NamedCache;

    import java.io.IOException;
    import java.io.Serializable;

    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;

    import java.util.Hashtable;
    import java.util.Iterator;
    import java.util.List;

    import javax.naming.Context;
    import javax.naming.InitialContext;

    import serp.bytecode.NameCache;


    public class MyLoadInvocable extends AbstractInvocable implements PortableObject {

    private List<String> m_memberKeys;
    private String m_cache;

    public MyLoadInvocable() {
    super();
    }

    public MyLoadInvocable(List<String> memberKeys, String cache) {
    m_memberKeys = memberKeys;
    m_cache = cache;
    }

    public Connection getConnection() {
    Connection m_con = null;
    try {
    Context ctx = null;


    Hashtable<String,String> ht = new Hashtable<String,String>();
    ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
    ht.put(Context.PROVIDER_URL,"t3://localhost:7001");
    ctx = new InitialContext(ht);
    javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds");

    m_con = ds.getConnection();
    } catch (Exception e) {
    System.out.println(e.getMessage());
    }

    return m_con;
    }

    public void run() {
    System.out.println("Enter MyLoadInvocable run....");
    NamedCache cache = CacheFactory.getCache(m_cache);
    Person person = null;
    Connection con = getConnection();
    String sSQL = "SELECT id, firstname,lastname,address FROM persons WHERE id = ?";
    System.out.println("Enter load= "+sSQL);

    try
    {
    PreparedStatement stmt = con.prepareStatement(sSQL);

    for(int i = 0; i < m_memberKeys.size(); i++)
    {

    String id = (String)m_memberKeys.get(i);
    //System.out.println(list.get(i));
    System.out.println("==========="+id);

    stmt.setString(1, id);
    ResultSet rslt = stmt.executeQuery();
    if (rslt.next())
    {
    person = new Person(rslt.getString("id"),rslt.getString("firstname"),rslt.getString("lastname"),rslt.getString("address"));
    cache.put(person.getId(),person);

    }
    // stmt.close();

    }

    stmt.close();

    }catch (Exception e)
    {
    System.out.println("==="+e.getMessage());
    }


    }

    public void readExternal(PofReader in)
    throws IOException
    {
    m_memberKeys = (List<String>) in.readObject(0);
    m_cache = (String) in.readObject(1);
    }

    /**
    * {@inheritDoc}
    */
    public void writeExternal(PofWriter out)
    throws IOException
    {
    out.writeObject(0, m_memberKeys);
    out.writeObject(1, m_cache);
    }


    }

     

    LoadUsingEP.java

    装载的客户端,负责数据分段,InvocationService查找以及驱动。

    package dataload;

    import com.tangosol.net.CacheFactory;
    import com.tangosol.net.InvocationService;
    import com.tangosol.net.Member;
    import com.tangosol.net.NamedCache;
    import com.tangosol.net.PartitionedService;
    import com.tangosol.util.InvocableMap;

    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;

    import java.sql.SQLException;

    import java.sql.Statement;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Hashtable;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;

    import javax.naming.Context;
    import javax.naming.InitialContext;

    public class LoaderUsingEP {

    private Connection m_con;

    public Connection getConnection() {
    try {
    Context ctx = null;

    Hashtable<String,String> ht = new Hashtable<String,String>();
    ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
    ht.put(Context.PROVIDER_URL,"t3://localhost:7001");
    ctx = new InitialContext(ht);
    javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds");

    m_con = ds.getConnection();
    } catch (Exception e) {
    System.out.println(e.getMessage());
    }

    return m_con;
    }

    protected Set getStorageMembers(NamedCache cache)
    {
    return ((PartitionedService) cache.getCacheService())
    .getOwnershipEnabledMembers();
    }

    protected Map<Member, List<String>> divideWork(Set members)
    {
    Iterator i = members.iterator();
    Map<Member, List<String>> mapWork = new HashMap(members.size());

    try {
    String sql = "select count(*) from persons";
    int totalcount = 0;
    int membercount = members.size();
    Connection con = getConnection();
    Statement st = con.createStatement();
    ResultSet rs = st.executeQuery(sql);
    while (rs.next())
    totalcount = Integer.parseInt(rs.getString(1));

    int onecount = totalcount / membercount;
    int lastcount = totalcount % membercount;

    sql = "select id from persons";

    ResultSet rs1 = st.executeQuery(sql);
    int count = 0;
    int currentworker=0;
    ArrayList<String> list=new ArrayList<String>();

    while (rs1.next()) {

    if (count < onecount) {

    list.add(rs1.getString("id"));
    count++;
    } else {

    Member member = (Member) i.next();

    ArrayList<String> list2=new ArrayList<String>();
    list2.addAll(list);
    mapWork.put(member, list2);

    list.clear();

    /* print the list value
    for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){
    System.out.println("first="+entry.getKey());
    List<String> memberKeys = entry.getValue();
    for(int j = 0; j < memberKeys.size(); j++)
    {
    System.out.println("firsttime="+memberKeys.get(j));
    //System.out.println(list.get(i));
    }

    }
    */
    count=0;
    list.add(rs1.getString("id"));
    count++;

    currentworker ++;

    if (currentworker == membercount-1) {
    onecount = onecount+lastcount;
    }

    }



    }

    Member member = (Member) i.next();
    mapWork.put(member, list);

    st.close();
    con.close();
    } catch (Exception e) {
    System.out.println("Exception...."+e.getMessage());
    }

    for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){
    System.out.println("final="+entry.getKey());
    List<String> memberKeys = entry.getValue();
    for(int j = 0; j < memberKeys.size(); j++)
    {
    System.out.println(memberKeys.get(j));
    }

    }
    return mapWork;
    }

    public void load()
    {
    NamedCache cache = CacheFactory.getCache("SampleCache");

    Set members = getStorageMembers(cache);
    System.out.println("members"+members.size());

    Map<Member, List<String>> mapWork = divideWork(members);

    InvocationService service = (InvocationService)
    CacheFactory.getService("LocalInvocationService");

    for (Map.Entry<Member, List<String>> entry : mapWork.entrySet())
    {

    Member member = entry.getKey();
    List<String> memberKeys = entry.getValue();
    System.out.println(memberKeys.size());

    MyLoadInvocable task = new MyLoadInvocable(memberKeys, cache.getCacheName());
    service.execute(task, Collections.singleton(member), null);
    }
    }

    public static void main(String[] args) {
    LoaderUsingEP ep = new LoaderUsingEP();
    ep.load();
    }


    }

    需要配置的客户端schema

    storage-override-client.xml

    <?xml version="1.0"?>
    <!DOCTYPE cache-config SYSTEM "cache-config.dtd">
    <cache-config>
    <caching-scheme-mapping>
    <!--
    Caches with names that start with 'DBBacked' will be created
    as distributed-db-backed.
    -->
    <cache-mapping>
    <cache-name>SampleCache</cache-name>
    <scheme-name>distributed-pof</scheme-name>
    </cache-mapping>
    </caching-scheme-mapping>
    <caching-schemes>
    <!--
    DB Backed Distributed caching scheme.
    -->
    <distributed-scheme>
    <scheme-name>distributed-pof</scheme-name>
    <service-name>DistributedCache</service-name>
    <backing-map-scheme>


    <read-write-backing-map-scheme>

    <internal-cache-scheme>
    <local-scheme/>
    </internal-cache-scheme>


    <cachestore-scheme>
    <class-scheme>
    <class-name>dataload.DBCacheStore</class-name>
    <init-params>
    <init-param>
    <param-type>java.lang.String</param-type>
    <param-value>persons</param-value>
    </init-param>
    </init-params>
    </class-scheme>
    </cachestore-scheme>
    </read-write-backing-map-scheme>
    </backing-map-scheme>

    <listener/>
    <autostart>true</autostart>
    <local-storage>false</local-storage>
    </distributed-scheme>

    <invocation-scheme>
    <scheme-name>my-invocation</scheme-name>
    <service-name>LocalInvocationService</service-name>
    <thread-count>5</thread-count>
    <autostart>true</autostart>
    </invocation-scheme>


    </caching-schemes>
    </cache-config>

    存储节点的Schema

    <?xml version="1.0"?>
    <!DOCTYPE cache-config SYSTEM "cache-config.dtd">
    <cache-config>
    <caching-scheme-mapping>
    <!--
    Caches with names that start with 'DBBacked' will be created
    as distributed-db-backed.
    -->
    <cache-mapping>
    <cache-name>SampleCache</cache-name>
    <scheme-name>distributed-pof</scheme-name>
    </cache-mapping>
    </caching-scheme-mapping>
    <caching-schemes>
    <!--
    DB Backed Distributed caching scheme.
    -->
    <distributed-scheme>
    <scheme-name>distributed-pof</scheme-name>
    <service-name>DistributedCache</service-name>
    <backing-map-scheme>


    <read-write-backing-map-scheme>

    <internal-cache-scheme>
    <local-scheme/>
    </internal-cache-scheme>


    <cachestore-scheme>
    <class-scheme>
    <class-name>dataload.DBCacheStore</class-name>
    <init-params>
    <init-param>
    <param-type>java.lang.String</param-type>
    <param-value>persons</param-value>
    </init-param>
    </init-params>
    </class-scheme>
    </cachestore-scheme>
    </read-write-backing-map-scheme>
    </backing-map-scheme>

    <listener/>
    <autostart>true</autostart>
    <local-storage>true</local-storage>
    </distributed-scheme>

    <invocation-scheme>
    <scheme-name>my-invocation</scheme-name>
    <service-name>LocalInvocationService</service-name>
    <thread-count>5</thread-count>
    <autostart>true</autostart>
    </invocation-scheme>


    </caching-schemes>
    </cache-config>

    输出结果

     

     可见数据分片装载.

  • 相关阅读:
    Python之格式化unix时间戳
    Python简单的验证码生成
    Python字符串常用的一些东西
    PHP explode()函数
    PHP函数number_format()
    PHP简单的计算位数的函数
    python之列表推导式
    python之把列表当做队列使用
    python之列表操作的几个函数
    python之map函数
  • 原文地址:https://www.cnblogs.com/ericnie/p/6130545.html
Copyright © 2011-2022 走看看