zoukankan      html  css  js  c++  java
  • 每月博客-20161205

    根据之前指定的计划,每两周至少阅读一段比较有意义的代码,每月写一次博客。

    之前计划都jdk的源码来着,发现还是比较高深的,一时还不好读懂,于是跟同事要了一段他写过的代码来阅读分析。

    个人感觉,拿出一段可运行、有意义的代码,描述实现的功能,并添加适当的注释,分享出来,对于个人提升水平很有帮助,交流分享也可以使得大家都有所提升。

    具体的内容如下(未经授权,不得随意转载):

    //程序实现功能:多线程并发读取文件并打印


    //第一部分 Main函数
    package com.loong.test;

    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Hashtable;

    import com.loong.data.imp.BigFileReader;
    import com.loong.data.imp.IHandle;

    public class Main {
    public static void main(String[] args) throws Exception {
    long startTime = System.currentTimeMillis();

    BigFileReader.Builder builder = new BigFileReader.Builder("D:\账号数据\aaaaa.unl", new IHandle(){
    @Override
    public void handle(String line, long index) {
    System.out.println(line);
    }
    });
    builder.setTreahdSize(4).setCharset("GBK").setBufferSize(1024*1024);
    BigFileReader bigFileReader = builder.build();
    bigFileReader.start();


    }
    }

    //第二部分 实现类
    package com.loong.data.imp;

    import java.io.ByteArrayOutputStream;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel.MapMode;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicLong;

    public class BigFileReader {
    //并发线程数
    private int threadSize;
    //编码格式
    private String charset;
    //每块读取的大小
    private int bufferSize;
    private IHandle handle;
    //线程调度器
    private ExecutorService executorService;
    //要读取的文件长度
    private long fileLength;
    private RandomAccessFile rAccessFile;
    //存储文件分块的结构体
    private Set<StartEndPail> startEndPails;
    //线程同步器
    private CyclicBarrier cyclicBarrier;
    private AtomicLong counter = new AtomicLong(0);

    //构造函数,由主函数通过handle函数来调用
    private BigFileReader (File file,IHandle handle,String charset,int bufferSize, int threadSize) {
    this.fileLength = file.length();
    this.handle = handle;
    this.charset = charset;
    this.bufferSize = bufferSize;
    this.threadSize = threadSize;
    try {
    this.rAccessFile = new RandomAccessFile(file,"r");
    } catch (FileNotFoundException e) {
    e.printStackTrace();
    }
    this.executorService = Executors.newFixedThreadPool(threadSize);
    startEndPails = new HashSet<BigFileReader.StartEndPail>();
    }

    //主方法。
    //1.计算出每个线程所要读取的分块
    //2.启动各个线程读取文件
    //3.在所有线程完成读取打印后执行关闭操作(通过同步器来实现)
    public void start() {
    long everySize = this.fileLength/this.threadSize;
    try {
    calculateStartEnd(0,everySize);
    } catch (IOException e) {
    e.printStackTrace();
    return;
    }
    final long startTime = System.currentTimeMillis();
    cyclicBarrier = new CyclicBarrier(startEndPails.size(),new Runnable(){
    @Override
    public void run() {
    System.out.println("use time: "+(System.currentTimeMillis()-startTime));
    System.out.println("all line: "+counter.get());
    shutdown();
    }
    });
    for (StartEndPail pail : startEndPails) {
    System.out.println("分配分片: "+pail);
    this.executorService.execute(new SliceReaderTask(pail));
    }
    }

    //计算出每个线程所要读取的分块的函数
    private void calculateStartEnd(long start, long size) throws IOException {
    if(start > fileLength-1){
    return;
    }
    StartEndPail pair = new StartEndPail();
    pair.start = start;
    long endPosition = start+size-1;
    if(endPosition >= fileLength-1){
    pair.end = fileLength - 1;
    startEndPails.add(pair);
    return;
    }
    rAccessFile.seek(endPosition);
    byte tmp = (byte)rAccessFile.read();
    while (tmp != ' ' && tmp != ' ') {
    endPosition++;
    if(endPosition >= fileLength-1){
    endPosition = fileLength-1;
    break;
    }
    rAccessFile.seek(endPosition);
    tmp = (byte)rAccessFile.read();
    }
    pair.end = endPosition;
    startEndPails.add(pair);
    calculateStartEnd(endPosition+1, size);
    }

    //执行关闭操作的函数
    public void shutdown() {
    try {
    this.rAccessFile.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    this.executorService.shutdown();

    }

    //用于打印的函数
    public void handle(byte[] bytes) throws Exception {
    String line = null;
    if(this.charset == null){
    line = new String(bytes);
    }else{
    line = new String(bytes,charset);
    }
    if(line != null && !"".equals(line)){
    long index = counter.incrementAndGet();
    this.handle.handle(line,index);
    }

    }

    //存储线程读取文件分块的结构体
    private static class StartEndPail{
    public long start;
    public long end;
    @Override
    public String toString() {
    return "stat="+start+";end="+end;
    }
    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + (int)(end ^ (end >>> 32));
    result = prime * result + (int)(start ^ (start >>> 32));
    return result;
    }
    @Override
    public boolean equals(Object obj) {
    if(this == obj){
    return true;
    }
    if(obj == null){
    return false;
    }
    if(getClass() != obj.getClass()){
    return false;
    }
    StartEndPail other = (StartEndPail)obj;
    if(end != other.end){
    return false;
    }
    if(start != other.start){
    return false;
    }
    return true;
    }
    }

    //具体读取文件的线程
    public class SliceReaderTask implements Runnable{
    private long start;
    private long sliceSize;
    private byte[] readBuff;
    public SliceReaderTask(StartEndPail pair) {
    this.start = pair.start;
    this.sliceSize = pair.end - pair.start + 1;
    this.readBuff = new byte[bufferSize];
    }

    @Override
    public void run() {
    try {
    MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize);
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    for (int offset = 0; offset < sliceSize; offset+=bufferSize) {
    int readLenth;
    if(offset + bufferSize <= sliceSize){
    readLenth = bufferSize;
    }else{
    readLenth = (int)(sliceSize - offset);
    }
    mapBuffer.get(readBuff, 0, readLenth);
    for (int i = 0; i < readLenth; i++) {
    byte tmp = readBuff[i];
    if(tmp == ' ' || tmp == ' '){
    handle(bos.toByteArray());
    bos.reset();
    }else{
    bos.write(tmp);
    }
    }
    }
    if(bos.size() > 0){
    handle(bos.toByteArray());
    }
    cyclicBarrier.await();
    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    }

    //供main函数调用的构造器
    public static class Builder{
    private int threadSize = 1;
    private String charset = null;
    private int bufferSize = 1024*1024;
    private IHandle handle;
    private File file;
    public Builder(String file, IHandle handle) {
    this.file = new File(file);
    if(!this.file.exists()){
    throw new IllegalArgumentException("文件不存在!");
    }
    this.handle = handle;
    }
    public Builder setTreahdSize(int size) {
    this.threadSize = size;
    return this;
    }

    public Builder setCharset(String charset) {
    this.charset = charset;
    return this;
    }
    public Builder setBufferSize(int bufferSize) {
    this.bufferSize = bufferSize;
    return this;
    }
    public BigFileReader build(){
    return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize);
    }

    }


    }

  • 相关阅读:
    博客系统-验证码相关
    博客系统-登录注册
    Django-wsgi实例
    Django-启动文件的制作
    css实现轮播效果图
    Django-celery分布式任务
    无聊。。。。。
    Nginx简单了解
    IIC知识
    BootLoader的一些知识
  • 原文地址:https://www.cnblogs.com/songtianbao/p/6135077.html
Copyright © 2011-2022 走看看