zoukankan      html  css  js  c++  java
  • FIR on Hadoop using hadoop-streaming

    1.Prepare Hadoop Streaming

    Hadoop streaming allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.

    1.1.Download Hadoop Streaming fit for your hadoop version

    For hadoop2.4.0, you can visit the following website and download the jar file:

    http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-streaming/2.4.0

    Than put the jar file to the $HADOOP_HOME folder.

    1.2.Hadoop-streaming test in my cluster:

    bin/hadoop jar hadoop-streaming-2.4.0.jar -input /in -output /out2 -mapper /bin/cat

    -reducer /usr/bin/wc

    1.3.Other map reduce demo here:

    Python:

    http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

    C++:

    http://blog.sina.com.cn/s/blog_62a9902f01018rzj.html

    2.FIR hardware:

    This is the code provided by xilinx workshop for audio application, we just use it to do our tests.

    2.1.fir.h

    #ifndef _FIR_H_
    #define _FIR_H_
    #include "ap_cint.h"
    #define N	59
    #define SAMPLES N+10 // just few more samples then number of taps
    typedef long	coef_t;
    typedef long	data_t;
    typedef long	acc_t;
    #endif

    2.2.fir_coef.dat

    -378,
    -73,
    27,
    170,
    298,
    352,
    302,
    168,
    14,
    -80,
    -64,
    53,
    186,
    216,
    40,
    -356,
    -867,
    -1283,
    -1366,
    -954,
    -51,
    1132,
    2227,
    2829,
    2647,
    1633,
    25,
    -1712,
    -3042,
    29229,
    -3042,
    -1712,
    25,
    1633,
    2647,
    2829,
    2227,
    1132,
    -51,
    -954,
    -1366,
    -1283,
    -867,
    -356,
    40,
    216,
    186,
    53,
    -64,
    -80,
    14,
    168,
    302,
    352,
    298,
    170,
    27,
    -73,
    -378

    2.3.fir.c

    #include "fir.h"
    
    void fir (
      data_t *y,
      data_t x
      ) {
      const coef_t c[N]={
     #include "fir_coef.dat"
        };
    
    
      static data_t shift_reg[N];
      acc_t acc;
      int i;
    
      acc=(acc_t)shift_reg[N-2]*(acc_t)c[N-1];
      loop: for (i=N-2;i!=0;i--) {
        acc+=(acc_t)shift_reg[i-1]*(acc_t)c[i];
        shift_reg[i]=shift_reg[i-1];
      }
      acc+=(acc_t)x*(acc_t)c[0];
      shift_reg[0]=x;
      *y = acc;
    }

    2.4.fir_test.c

    #include <stdio.h>
    #include <math.h>
    #include "fir.h"
    void fir (
      data_t *y,
      data_t x
      );
    
    int main () {
      FILE   *fp;
    
      data_t signal, output;
    
      fp=fopen("fir_impulse.dat","w");
      int i;
      for (i=0;i<SAMPLES;i++) {
    	  if(i==0)
    		  signal = 0x8000;
    	  else
    		  signal = 0;
    	  fir(&output,signal);
       	  printf("%i %d %d
    ",i,(int)signal,(int)output);
    //   	  fprintf(fp,"%i %d %d
    ",i,signal,output);
      }
      fclose(fp);
      return 0;
    }

    2.5 Pictures

    Here is the project in Vivado HLS:

    image

    Here is the Directive View:

    image

    In vivado Project Settings:

    image

    Block Design:

    image

    Reg Address:

    image

    2.6 Stand alone application:

    #include <stdio.h>
    #include "platform.h"
    #include "xfir_hw.h"
    #include "xparameters.h"
    #include "xil_io.h"
    #include "xfir.h"
    
    #define SAMPLES 68
    
    
    int main()
    {
    
    	int i = 0;
    	int k=0;
    	long signal, out;
    	u32 sample;
    
        init_platform();
        print("Hello World
    
    ");
    
        //filter_hw_accel_input(&out,signal);
        print("debug");
    
        for (i=0;i<59;i++) {
        	if(i==0)
        		signal = 0;
        	else
        		signal = 0;
        	filter_hw_accel_input(&out,signal);
        }
        for (i=0;i<SAMPLES;i++) {
            	if(i==0)
            		signal = 1;
            	else
            		signal = 0;
            	filter_hw_accel_input(&out,signal);
            	//printf("in=0x%x ",signal);
            	printf("out=%ld
    ",out);
            	printf("i = %d
    ",i);
            	sleep(1);
            }
        sample = 0x1111;
        Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA,sample);
        Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel
        Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0);
        while(1){
            			if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL))
            				break;
            			else
            				continue;
            		}
        sample = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA);
        printf("sample=0x%x
    ",sample);
    }
    
    void filter_hw_accel_input(long * Sample_out, long Sample_in)
        {
    
        		Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA, Sample_in); // send left channel sample
        		Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel
        		Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0);
        		while(1){
        			if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL))
        				break;
        			else
        				continue;
        		}
        		*Sample_out = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA);
        }

    2.7 standalone application configurations:

    Repo should be set to the folder that contains the driver folder in fir HLS project.

    image

    Board Support Package setting:

    image

    3 Linux:

    3.1 driver:

    #include <linux/module.h>
    #include <linux/version.h>
    #include <linux/kernel.h>
    #include <linux/init.h>
    #include <linux/fs.h>
    #include <linux/cdev.h>
    #include <linux/delay.h>
    #include <linux/spinlock.h>
    #include <linux/device.h>
    #include <linux/types.h>
    #include <linux/ioctl.h>
    
    #include <asm/io.h>
    #include <asm/uaccess.h>
    #include <asm/atomic.h>
    #include <linux/wait.h>
    #include <linux/cdev.h>
    
    #include <linux/interrupt.h>
    #include <asm/signal.h>
    #include <linux/gpio.h>
    #include <linux/irq.h>
    #include <linux/semaphore.h>
    
    #define DEVICE_NAME "fir_dev"
    #define FIR_BASEADDR 0x43C00000
    
    
    #define XFIR_DATA_IN_OFFSET 0x1C
    #define XFIR_DATA_OUT_OFFSET    0x14
    #define XFIR_CTRL_OFFSET    0x00
    #define XFIR_DONE_OFFSET    0x10
    
    static unsigned long fir_addr = 0;
    
    static struct class* fir_class = NULL;
    static struct device* fir_device = NULL;
    
    static int fir_major = 0;
    
    static struct semaphore sem;
    
    //arg means the led number, cmd controls it on or off
    static ssize_t fir_ioctl(struct file *file, unsigned int cmd, long *arg)
    {
        //printk("fun(gpio_ioctl):");
        long status = 0xff;
        int ret,ii;
        printk("******** in ioctl ************
    ");
        switch(cmd){
            case 0:
            case 1:
                //init fir put 59 zero 
                for(ii=0;ii<59;ii++)
                {
                    iowrite32(0x0,fir_addr+XFIR_DATA_IN_OFFSET);
                    iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET);
                    iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET);
                    while(!ioread32(fir_addr+XFIR_DONE_OFFSET));
                }
                //now read data from fir y
                //status = ioread32(fir_addr+FIR_DATA_OUT_OFFSET);
                printk("59 zeros has been put to fir!!
    ");
                return 0;
            case 3:
                printk("in data = %ld
    ",*arg);
                iowrite32(*arg,fir_addr+XFIR_DATA_IN_OFFSET);
                iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET);
                iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET);
                //now wait for fir generate ok
                while(!ioread32(fir_addr+XFIR_DONE_OFFSET));
                //now read data from fir y
                status = ioread32(fir_addr+XFIR_DATA_OUT_OFFSET);
                ret = __put_user(status, (long *)arg);
                printk("out data = 0x%x
    ",status);
                return 0;
            case 5:
                up(&sem);
                printk("sema up
    ");
                return 0;
            default:
                printk("default cmd=%d
    ",cmd);
                return -EINVAL;
        }
    }
    
    int fir_open(struct inode *inode, struct file *filp)
    {
        sema_init(&sem,1);
        down(&sem);
        printk("sema down
    ");
        return 0;
    }
    
    
    static struct file_operations fir_fops = {
        .owner = THIS_MODULE,
        .unlocked_ioctl = fir_ioctl,
        .open = fir_open,
    };
    
    static int __init fir_init(void)
    {
        int ret;
        ret = register_chrdev(0,DEVICE_NAME, &fir_fops);
        if(ret < 0)
        {
            printk("fir: can't get major number
    ");
            return ret;
        }
    
        fir_major = ret;
        fir_class = class_create(THIS_MODULE, "fir_class");
        if(IS_ERR(fir_class))
        {
            printk("fir: failed in creating class
    ");
            unregister_chrdev(fir_major, DEVICE_NAME);
            return -1;
        }
        fir_device = device_create(fir_class,NULL,MKDEV(fir_major,0),NULL,DEVICE_NAME);
        if(IS_ERR(fir_device))
        {
            printk("fir: failed in creating device!
    ");
            unregister_chrdev(fir_major, DEVICE_NAME);
            class_unregister(fir_class);
            class_destroy(fir_class);
            return -1;
        }
    
        fir_addr = (unsigned long) ioremap(FIR_BASEADDR, sizeof(u32));
    
        printk("fir installed successfully!
    ");
        return 0;
    }
    static void __exit fir_exit(void)
    {
        device_destroy(fir_class,MKDEV(fir_major, 0));
        class_unregister(fir_class);
        class_destroy(fir_class);
        unregister_chrdev(fir_major,DEVICE_NAME);
        printk("fir module exit!");
    }
    
    module_init(fir_init);
    module_exit(fir_exit);
    
    
    MODULE_AUTHOR("seg");
    MODULE_LICENSE("GPL");

    Makefile:

    # Makefile for globalmm Driver
    #           Program start 2014.3.14
    # 如果已经定义了KERNELRELEASE,则说明从内核构造系统调用的。
    # 因此可以利用其内建语句
    ifneq ($(KERNELRELEASE),)
        obj-m := fir_driver.o
    # 否则,要直接从命令行调用
    # 这时要调用内核构造系统
    else
    KERNELDIR = /root/zybo_gpio/linux-digilent/
    PWD := $(shell pwd)
    default:
        $(MAKE) -C $(KERNELDIR) M=$(PWD) modules ARCH=arm
    clean:
        rm -rf *.o *~ core .depend .*.cmd *.ko *.mod.c .tmp_versions modules.* Module.*
    endif

    3.2 test app:

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/ioctl.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <sys/select.h>
    #include <sys/time.h>
    
    static int fir_fd;
    
    int main(void)
    {
            int i;
            long val;
            // open device
            fir_fd = open("/dev/fir_mod", 0);
            if (fir_fd < 0) {
                    perror("open device gpio_dev error!
    ");
                    exit(1);
            }
            printf("fd = %d
    ",fir_fd);
            printf("read fir devide successful
    ");
            ioctl(fir_fd,1,0);
            for(i=0;i<60;i++)
            {
                    if(i==0)
                    {
                            val = 1;
                            ioctl(fir_fd,3,&val);
                    }
                    else
                    {
                            val = 0;
                            ioctl(fir_fd,3,&val);
                            printf("%d:%ld
    ",i,val);
                    }
    
            }
            //ioctl(fir_fd,3,val);
            printf("done");
    }

    3.3 mapper.cpp

    #include <iostream>
    #include <stdio.h>
    #include <string>
    #include <fcntl.h>
    #include <sys/ioctl.h>
    using namespace std;
    
    
    int fir_fd;
    
    int main()
    {
            string str;
            int i;
            fir_fd = open("/dev/fir_mod", 0);
    //      printf("%d
    ",fir_fd);
    
            while(cin>>str)
            {
                    //the key is the original value
                    //cout << str << "	";
    
                    //firstly, init fir with 58 zero
                    char out_val[4];
    //              printf("debug");
                    ioctl(fir_fd,1,0);
    
                    //secondly, do the real fir
                    int len = str.length()/4;
                    for(i=0;i<len;i++)
                    {
                            str.copy(out_val+3,1,i*4+3);
                            str.copy(out_val+2,1,i*4+2);
                            str.copy(out_val+1,1,i*4+1);
                            str.copy(out_val+0,1,i*4+0);
                            ioctl(fir_fd,3,(long*)out_val);
                            cout << out_val[0] << out_val[1]<<out_val[2] << out_val[
    3];
                    }
                    //release sem
                    ioctl(fir_fd,5,0);
    
                    //the value is fir out, and follow a line finish
                    cout << endl;
            }
            return 0;
    }

    3.4 reducer.cpp

    #include <iostream>
    #include <string>
    //#include <map>
    
    using namespace std;
    
    /*
    int main() {
     map<string,int> firMap;
      map<string,int>::iterator it;
     string key;
     int value;
     int count;
     while(cin>>key>>value) {
      firMap[key] +=value;
     }
     for(it=firMap.begin();it != firMap.end();it++) {
      cout<<it->first<<"	"<<it->second<<endl;
     }
     return 0;
    }
    */
    int main()
    {
    
            string str;
            while(cin >> str)
                    cout << str;
            return 0;
    }

    3.5 test scripts:

    #! /bin/sh -
    
    #indir=/in
    indir=/fir/signal100.dat
    outdir=/out/hw100v3
    
    cd /root/
    if [ `ls /dev/ | grep fir_mod` -eq `echo` ]
    then
            ./install_fir.sh
    fi
    
    echo fir run successful!
    
    cd /root/hadoop-2.4.0/
    bin/hadoop jar hadoop-streaming-2.4.0.jar 
    -D mapred.map.tasks=1 
    -D mapred.reduce.tasks=1 
    -file /root/hw_fir/Mapper -mapper /root/hw_fir/Mapper 
    -file /root/hw_fir/Reducer -reducer /root/hw_fir/Reducer 
    -input $indir -output $outdir
    
    echo test now finished.

    done

  • 相关阅读:
    FileUpload1上传控件
    docker如何push镜像到docker hub个人的仓库
    docker的ubuntu镜像无ifconfig和ping命令
    keystone同步数据库的时候提示error
    openstack安装dashboard后访问horizon出错 500 or 504
    装了ubuntu之后,只能进入ubuntu系统,不能进入windows系统
    Kernal Panic
    无法获得锁 /var/lib/dpkg/lock -open
    用户 'NT AUTHORITYIUSR' 登录失败
    配置错误:不能在此路径中使用此配置节。
  • 原文地址:https://www.cnblogs.com/shenerguang/p/3891906.html
Copyright © 2011-2022 走看看