zoukankan      html  css  js  c++  java
  • Distributed Hash Table

      Chord Algorithm is used to search the Distributed Hash Table (DHT) in a Peer-to-Peer Network, where a peer only remembers the addresses of a few companions but can get access to any peer who is responsible for a given key. This demo program is largely based on such thinking although there may be some modifications and simplifications.

      Moreover, in this program I used Java UDP sockets to simulate a simple reliable data transfer protocol, which can deal with bit error, packet error and packet loss. A message is encapsulated with a SEQ number, an ACK number and a CRC checksum. To simplify the application layer programming, I provide a Client class and a Server class with some convenient APIs. A Server should always listen and answer alternatively, and can be informed or inquired by calling a Client method.

     

      Makefile:

    all: ./bin/Chord.class ./bin/Peer.class
    
    ./bin/Chord.class: ./src/Chord.java ./src/Peer.java
        javac -classpath ./bin/ -d ./bin/ ./src/Chord.java
    
    ./bin/Peer.class: ./src/Peer.java
        javac -d ./bin/ ./src/Peer.java
    
    clean:
        rm ./bin/*.class

    1. Chord.java

     1 import java.util.concurrent.*;
     2 import java.util.*;
     3 import java.io.*;
     4 
     5 
     6 public class Chord extends Thread {
     7     private static int num;
     8     private static Process[] p;
     9     private static Chord[] t;
    10     private static int[] port;
    11     private BufferedReader in;
    12     private PrintWriter out;
    13     
    14     public Chord(Process proc) {
    15         in = new BufferedReader(new InputStreamReader(proc.getInputStream()));
    16         out = new PrintWriter(new OutputStreamWriter(proc.getOutputStream()));
    17         start();
    18     }
    19     public void run() {
    20         try {
    21             out.println(num);
    22             for (int i=0;i<num;i++) {
    23                 out.println(port[i]);
    24             }
    25             out.close();
    26             String str = null;
    27             while (true) {
    28                 str = in.readLine();
    29                 if (str.charAt(0)!='#') {
    30                     System.out.println(str);
    31                 } else {
    32                     break;
    33                 }
    34             }
    35             in.close();
    36         } catch (Exception e) {
    37             e.printStackTrace();
    38         }
    39     }
    40     public static void main(String[] args) {
    41         num = Integer.parseInt(args[0]);
    42         p = new Process[num];
    43         t = new Chord[num];
    44         port = new int[num];
    45         Random rand = new Random();
    46         for (int i=0;i<num;i++) {
    47             port[i] = 1024+rand.nextInt(8192);
    48         }
    49         System.out.println("
    Wait Please ~
    ");
    50         try {
    51             for (int i=0;i<num;i++) {
    52                 /* Thread t[i] is resp for the standard IO of Process p[i] */
    53                 p[i] = Runtime.getRuntime().exec("java -classpath ./bin/ Peer "+port[i]);
    54                 t[i] = new Chord(p[i]);
    55                 TimeUnit.MILLISECONDS.sleep(100);
    56             }
    57             for (int i=0;i<num;i++) {
    58                 p[i].waitFor();
    59                 t[i].join();
    60             }
    61         } catch (Exception e) {
    62             e.printStackTrace();
    63         }
    64         System.out.println();
    65     }
    66 }

    2. Peer.java

      1 import java.util.concurrent.atomic.*;
      2 import java.util.concurrent.*;
      3 import java.util.*;
      4 import java.text.*;
      5 import java.net.*;
      6 import java.io.*;
      7 
      8 
      9 abstract class RDT {
     10     /** RDT is a class using UDP sockets to simulate
     11      *        reliable data transfer that implements packet
     12      *        loss/error detection and retransmission.
     13      *    An RDT message = SEQ + ACK + content + CRC checksum
     14      *        if (SEQ==0)
     15      *            no reply is expected, a client "ACK" message
     16      *        else
     17      *            a reply with the identical ACK is expected
     18      */
     19     
     20     private static final int GEN = 0x04C11DB7;
     21     private static final int SIZE = 256;
     22     protected static final int TIME_LIMIT = 10;
     23     protected static Random rand;
     24     protected InetAddress addr;
     25     protected int targetPort;
     26     protected DatagramSocket socket;
     27     private DatagramPacket packet;
     28     private byte[] buffer;
     29     private byte ack = 0;
     30         
     31     static {
     32         rand = new Random();
     33     }
     34     protected String rcvd() {
     35         /* Receive a message with desired ACK # */
     36         try {
     37             do {
     38                 buffer = new byte[SIZE];
     39                 packet = new DatagramPacket(buffer,SIZE);
     40                 socket.receive(packet);
     41             } while (checksum()!=0||buffer[1]!=ack);
     42             ack = buffer[0];    // next ACK # to be written
     43             if (ack!=0) {
     44                 addr = packet.getAddress();
     45                 targetPort = packet.getPort();
     46             }
     47             return new String(buffer,"ASCII").substring(2,SIZE-4);
     48         } catch (Exception e) {
     49             // Timeout Exception: the packet is lost!
     50             return null;
     51         }
     52     }
     53     protected void send(byte seq,String str) {
     54         /* Send a message with 'best effort' */
     55         buffer = new byte[SIZE];
     56         buffer[0] = seq;
     57         buffer[1] = ack;
     58         try {    // write the data content
     59             byte[] tmp = str.getBytes("ASCII");
     60             System.arraycopy(tmp,0,buffer,2,tmp.length);
     61         } catch (Exception e) {
     62             e.printStackTrace();
     63         }
     64         // write the checksum field
     65         int cks = checksum();
     66         buffer[SIZE-4] = (byte)(cks&0xFF);
     67         cks >>= 8;
     68         buffer[SIZE-3] = (byte)(cks&0xFF);
     69         cks >>= 8;
     70         buffer[SIZE-2] = (byte)(cks&0xFF);
     71         cks >>= 8;
     72         buffer[SIZE-1] = (byte)(cks&0xFF);
     73         // encapsulate and send the packet
     74         packet = new DatagramPacket(buffer,SIZE,addr,targetPort);
     75         try {
     76             socket.send(packet);
     77         } catch (Exception e) {
     78             System.out.println("Cannot send msg: "+str);
     79         }
     80         // next ACK # to be read
     81         ack = buffer[0];
     82     }
     83     private int checksum() {
     84         /* Calculate the CRC checksum */
     85         int rem = 0, N = (SIZE<<3);
     86         for (int pos=0;pos<N;pos++) {
     87             byte tmp = (byte)(buffer[pos>>3]>>(pos&7));
     88             if (rem>=0) {
     89                 rem = (rem<<1)|(tmp&1);
     90             } else {
     91                 rem = (rem<<1)|(tmp&1);
     92                 rem ^= GEN;
     93             }
     94         }
     95         return revBits(rem);
     96     }
     97     private int revBits(int val) {
     98         /* Reverse the bits of an integer */
     99         int tmp = 0;
    100         for (int i=0;i<16;i++) {
    101             tmp |= (((1<<i)&val)<<(31-i-i));
    102             tmp |= (((1<<(31-i))&val)>>>(31-i-i));
    103         }
    104         return tmp;
    105     }
    106     protected static boolean isACK(String str) throws Exception {
    107         /* Test whether a string is equal to "ACK" */
    108         if (str==null) {
    109             return false;
    110         } else {
    111             byte[] b = str.getBytes("ASCII");
    112             return (b.length>=3 && b[0]=='A'
    113                 && b[1]=='C' && b[2]=='K');
    114         }
    115     }
    116 }
    117 
    118 
    119 class Client extends RDT {
    120     /**    An RDT client should always send a message first,
    121      *        then receives a reply, and finally send an "ACK"
    122      *         message to terminate the conversation.
    123      *     An "ACK" message from the client indicates that
    124      *        it has received the reply and left.
    125      */
    126     
    127     private Client(int port) {
    128         try {
    129             addr = InetAddress.getByName("localhost");
    130             targetPort = port;
    131             socket = new DatagramSocket();
    132             socket.setSoTimeout(TIME_LIMIT);
    133         } catch(Exception e) {
    134             e.printStackTrace();
    135         }
    136     }
    137     public static String inquire(int port,String str) {
    138         /* Send a inquiry and return a pending reply */
    139         Client c = new Client(port);
    140         String res = null;
    141         while (res==null) {
    142             c.send((byte)(1+rand.nextInt(255)),str);
    143             res = c.rcvd();
    144         }
    145         c.send((byte)0,"ACK");
    146         c.socket.close();
    147         return res;
    148     }
    149     public static void inform(int port,String str) {
    150         /* Send a piece of info and get an "ACK" */
    151         Client c = new Client(port);
    152         String res = null;
    153         try {
    154             while (!isACK(res)) {
    155                 c.send((byte)(1+rand.nextInt(255)),str);
    156                 res = c.rcvd();
    157             }
    158         } catch (Exception e) {
    159             e.printStackTrace();
    160         }
    161         c.send((byte)0,"ACK");
    162         c.socket.close();
    163     }
    164 }
    165 
    166 
    167 class Server extends RDT {
    168     /**    An RDT server should always listen to a request 
    169      *        first, then respond with a reply, and finally
    170      *        get an "ACK" message.
    171      *    An "ACK" message from the server indicates that
    172      *        it has been informed of something.
    173      */
    174     
    175     public Server(int port) {
    176         try {
    177             socket = new DatagramSocket(port);
    178             socket.setSoTimeout(TIME_LIMIT);
    179         } catch (Exception e) {
    180             e.printStackTrace();
    181         }
    182     }
    183     public String listen() {
    184         String req = null;
    185         while (req==null) {
    186             req = rcvd();
    187         }
    188         return req;
    189     }
    190     public void answer(String str) {
    191         String res = null;
    192         try {
    193             while (!isACK(res)) {
    194                 send((byte)(1+rand.nextInt(255)),str);
    195                 res = rcvd();
    196             }
    197         } catch (Exception e) {
    198             e.printStackTrace();
    199         }
    200     }
    201 }
    202 
    203 
    204 public class Peer extends Thread {
    205     /**    A node on a Peer-to-Peer Network
    206      *    For consecutive peers with indexes i and j:
    207      *         peer[i] is resp for key k iff i <= k < j;
    208      *     finger[k] is the port # of the on-line peer with
    209      *         the minimum index not less than index+2^{k}.
    210      */
    211     
    212     private static final int LEN = 16;
    213     private static final int NUM = (1<<LEN);
    214     private static DecimalFormat df;
    215     private static int port, index;
    216     private static AtomicInteger[] finger;
    217     private Server sv;
    218     
    219     private Peer() {
    220         sv = new Server(port);
    221         start();
    222     }
    223     public void run() {
    224         while (true) {
    225             String msg = sv.listen();
    226             if (msg.charAt(0)=='?') {
    227                 msg = msg.substring(1,6);
    228                 int next = search(Integer.parseInt(msg));
    229                 if (next==port) {
    230                     sv.answer("!");
    231                 } else {
    232                     sv.answer("@"+next);
    233                 }
    234             } else if (msg.charAt(0)=='#') {
    235                 sv.answer("ACK");
    236                 break;
    237             }
    238         }
    239         System.out.println("#");
    240     }
    241     private static int calDelt(int val) {
    242         int delt = hashIdx(val)-index;
    243         return delt+((delt<=0)? NUM:0);
    244     }
    245     private static boolean within(int a,int x,int b) {
    246         a -= index;
    247         a += (a<0)? NUM:0;
    248         x -= index;
    249         x += (x<0)? NUM:0;
    250         b -= index;
    251         b += (b<0)? NUM:0;
    252         return a<=x && x<b;
    253     }
    254     private static boolean insfinger(int val) {
    255         /* Insert a port number to the finger table */
    256         boolean flag = false;
    257         if (val==port) {
    258             return flag;
    259         }
    260         int delt = calDelt(val);
    261         for (int i=0;(1<<i)<=delt;i++) {
    262             if (delt<calDelt(finger[i].get())) {
    263                 finger[i].set(val);
    264                 flag = true;
    265             }
    266         }
    267         return flag;
    268     }
    269     private static int search(int key) {
    270         /* Chord Algorithm Implementation */
    271         if (within(index,key,hashIdx(finger[0].get()))) {
    272             return port;
    273         }
    274         for (int i=0;i<LEN-1;i++) {
    275             int a = hashIdx(finger[i].get());
    276             int b = hashIdx(finger[i+1].get());
    277             if (b==index||within(a,key,b)) {
    278                 return finger[i].get();
    279             }
    280         }
    281         return finger[LEN-1].get();
    282     }
    283     private static int locate(int key) {
    284         /* Locate the peer who is resp for a given key */
    285         int next = search(key);
    286         if (next==port) {
    287             return port;
    288         }
    289         String s = null;
    290         while (true) {
    291             s = Client.inquire(next,"?"+df.format(key));
    292             if (s.charAt(0)=='@') {
    293                 s = s.substring(1,5);
    294                 next = Integer.parseInt(s);
    295             } else {
    296                 break;
    297             }
    298         }
    299         return next;
    300     }
    301     private static int hashIdx(int key) {
    302         /* Hash function using multiplication */
    303         double tmp = 0.61803*key;
    304         tmp -= Math.floor(tmp);
    305         return (int)Math.floor(NUM*tmp);
    306     }
    307     public static void main(String[] args) {
    308         df = new DecimalFormat("00000");
    309         try {
    310             port = Integer.parseInt(args[0]);
    311             index = hashIdx(port);
    312             finger = new AtomicInteger[LEN];
    313             for (int i=0;i<LEN;i++) {
    314                 finger[i] = new AtomicInteger(port);
    315             }
    316             // acquaint all the on-line peers
    317             Scanner in = new Scanner(System.in);
    318             int num = in.nextInt();
    319             for (int i=0;i<num;i++) {
    320                 insfinger(in.nextInt());
    321             }
    322             Peer server = new Peer();
    323             // for all peers to set up the finger table
    324             TimeUnit.MILLISECONDS.sleep(num*100);
    325             Random rand = new Random();
    326             for (int i=0;i<3;i++) {
    327                 // Each peer has three searching tasks.
    328                 int key = rand.nextInt(NUM);
    329                 System.out.println("Port "+port+": port "+locate(key)
    330                         +" is resp for key "+df.format(key));
    331             }
    332             // for all peers to finish the searching task
    333             TimeUnit.MILLISECONDS.sleep(num*100);
    334             Client.inform(port,"#");
    335             server.join();
    336         } catch (Exception e) {
    337             System.out.println("Main Error: "+e);
    338         }
    339     }
    340 }

    References:

      1. Kurose, James F., Keith W. Ross. Computer Networking: a top-down approach[M]. 北京:高等教育出版社, 2009-08

      2. Tanenbaum, Andrew S., David J. Wetherall. Computer Networks 5th edition[M]. 北京:清华大学出版社, 2011

  • 相关阅读:
    $NOIP2018$赛道修建
    $NOIP2005$过河
    $NOIP2014$飞扬的小鸟
    $[SCOI2014]$方伯伯的玉米田
    大吉大利,晚上吃鸡!
    $HNOI2005$星际贸易
    $CF1142B$ $Lynyrd Skynyrd$
    $SDOI2015$排序
    $NOIP2003$侦探推理
    Build 2016概览
  • 原文地址:https://www.cnblogs.com/DevinZ/p/4557151.html
Copyright © 2011-2022 走看看