1.Prepare Hadoop Streaming
Hadoop streaming allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
1.1.Download Hadoop Streaming fit for your hadoop version
For hadoop2.4.0, you can visit the following website and download the jar file:
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-streaming/2.4.0
Than put the jar file to the $HADOOP_HOME folder.
1.2.Hadoop-streaming test in my cluster:
bin/hadoop jar hadoop-streaming-2.4.0.jar -input /in -output /out2 -mapper /bin/cat
-reducer /usr/bin/wc
1.3.Other map reduce demo here:
Python:
http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
C++:
http://blog.sina.com.cn/s/blog_62a9902f01018rzj.html
2.FIR hardware:
This is the code provided by xilinx workshop for audio application, we just use it to do our tests.
2.1.fir.h
#ifndef _FIR_H_ #define _FIR_H_ #include "ap_cint.h" #define N 59 #define SAMPLES N+10 // just few more samples then number of taps typedef long coef_t; typedef long data_t; typedef long acc_t; #endif
2.2.fir_coef.dat
-378, -73, 27, 170, 298, 352, 302, 168, 14, -80, -64, 53, 186, 216, 40, -356, -867, -1283, -1366, -954, -51, 1132, 2227, 2829, 2647, 1633, 25, -1712, -3042, 29229, -3042, -1712, 25, 1633, 2647, 2829, 2227, 1132, -51, -954, -1366, -1283, -867, -356, 40, 216, 186, 53, -64, -80, 14, 168, 302, 352, 298, 170, 27, -73, -378
2.3.fir.c
#include "fir.h"
void fir (
data_t *y,
data_t x
) {
const coef_t c[N]={
#include "fir_coef.dat"
};
static data_t shift_reg[N];
acc_t acc;
int i;
acc=(acc_t)shift_reg[N-2]*(acc_t)c[N-1];
loop: for (i=N-2;i!=0;i--) {
acc+=(acc_t)shift_reg[i-1]*(acc_t)c[i];
shift_reg[i]=shift_reg[i-1];
}
acc+=(acc_t)x*(acc_t)c[0];
shift_reg[0]=x;
*y = acc;
}
2.4.fir_test.c
#include <stdio.h>
#include <math.h>
#include "fir.h"
void fir (
data_t *y,
data_t x
);
int main () {
FILE *fp;
data_t signal, output;
fp=fopen("fir_impulse.dat","w");
int i;
for (i=0;i<SAMPLES;i++) {
if(i==0)
signal = 0x8000;
else
signal = 0;
fir(&output,signal);
printf("%i %d %d
",i,(int)signal,(int)output);
// fprintf(fp,"%i %d %d
",i,signal,output);
}
fclose(fp);
return 0;
}
2.5 Pictures
Here is the project in Vivado HLS:
Here is the Directive View:
In vivado Project Settings:
Block Design:
Reg Address:
2.6 Stand alone application:
#include <stdio.h>
#include "platform.h"
#include "xfir_hw.h"
#include "xparameters.h"
#include "xil_io.h"
#include "xfir.h"
#define SAMPLES 68
int main()
{
int i = 0;
int k=0;
long signal, out;
u32 sample;
init_platform();
print("Hello World
");
//filter_hw_accel_input(&out,signal);
print("debug");
for (i=0;i<59;i++) {
if(i==0)
signal = 0;
else
signal = 0;
filter_hw_accel_input(&out,signal);
}
for (i=0;i<SAMPLES;i++) {
if(i==0)
signal = 1;
else
signal = 0;
filter_hw_accel_input(&out,signal);
//printf("in=0x%x ",signal);
printf("out=%ld
",out);
printf("i = %d
",i);
sleep(1);
}
sample = 0x1111;
Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA,sample);
Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel
Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0);
while(1){
if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL))
break;
else
continue;
}
sample = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA);
printf("sample=0x%x
",sample);
}
void filter_hw_accel_input(long * Sample_out, long Sample_in)
{
Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA, Sample_in); // send left channel sample
Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel
Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0);
while(1){
if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL))
break;
else
continue;
}
*Sample_out = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA);
}
2.7 standalone application configurations:
Repo should be set to the folder that contains the driver folder in fir HLS project.
Board Support Package setting:
3 Linux:
3.1 driver:
#include <linux/module.h>
#include <linux/version.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/delay.h>
#include <linux/spinlock.h>
#include <linux/device.h>
#include <linux/types.h>
#include <linux/ioctl.h>
#include <asm/io.h>
#include <asm/uaccess.h>
#include <asm/atomic.h>
#include <linux/wait.h>
#include <linux/cdev.h>
#include <linux/interrupt.h>
#include <asm/signal.h>
#include <linux/gpio.h>
#include <linux/irq.h>
#include <linux/semaphore.h>
#define DEVICE_NAME "fir_dev"
#define FIR_BASEADDR 0x43C00000
#define XFIR_DATA_IN_OFFSET 0x1C
#define XFIR_DATA_OUT_OFFSET 0x14
#define XFIR_CTRL_OFFSET 0x00
#define XFIR_DONE_OFFSET 0x10
static unsigned long fir_addr = 0;
static struct class* fir_class = NULL;
static struct device* fir_device = NULL;
static int fir_major = 0;
static struct semaphore sem;
//arg means the led number, cmd controls it on or off
static ssize_t fir_ioctl(struct file *file, unsigned int cmd, long *arg)
{
//printk("fun(gpio_ioctl):");
long status = 0xff;
int ret,ii;
printk("******** in ioctl ************
");
switch(cmd){
case 0:
case 1:
//init fir put 59 zero
for(ii=0;ii<59;ii++)
{
iowrite32(0x0,fir_addr+XFIR_DATA_IN_OFFSET);
iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET);
iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET);
while(!ioread32(fir_addr+XFIR_DONE_OFFSET));
}
//now read data from fir y
//status = ioread32(fir_addr+FIR_DATA_OUT_OFFSET);
printk("59 zeros has been put to fir!!
");
return 0;
case 3:
printk("in data = %ld
",*arg);
iowrite32(*arg,fir_addr+XFIR_DATA_IN_OFFSET);
iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET);
iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET);
//now wait for fir generate ok
while(!ioread32(fir_addr+XFIR_DONE_OFFSET));
//now read data from fir y
status = ioread32(fir_addr+XFIR_DATA_OUT_OFFSET);
ret = __put_user(status, (long *)arg);
printk("out data = 0x%x
",status);
return 0;
case 5:
up(&sem);
printk("sema up
");
return 0;
default:
printk("default cmd=%d
",cmd);
return -EINVAL;
}
}
int fir_open(struct inode *inode, struct file *filp)
{
sema_init(&sem,1);
down(&sem);
printk("sema down
");
return 0;
}
static struct file_operations fir_fops = {
.owner = THIS_MODULE,
.unlocked_ioctl = fir_ioctl,
.open = fir_open,
};
static int __init fir_init(void)
{
int ret;
ret = register_chrdev(0,DEVICE_NAME, &fir_fops);
if(ret < 0)
{
printk("fir: can't get major number
");
return ret;
}
fir_major = ret;
fir_class = class_create(THIS_MODULE, "fir_class");
if(IS_ERR(fir_class))
{
printk("fir: failed in creating class
");
unregister_chrdev(fir_major, DEVICE_NAME);
return -1;
}
fir_device = device_create(fir_class,NULL,MKDEV(fir_major,0),NULL,DEVICE_NAME);
if(IS_ERR(fir_device))
{
printk("fir: failed in creating device!
");
unregister_chrdev(fir_major, DEVICE_NAME);
class_unregister(fir_class);
class_destroy(fir_class);
return -1;
}
fir_addr = (unsigned long) ioremap(FIR_BASEADDR, sizeof(u32));
printk("fir installed successfully!
");
return 0;
}
static void __exit fir_exit(void)
{
device_destroy(fir_class,MKDEV(fir_major, 0));
class_unregister(fir_class);
class_destroy(fir_class);
unregister_chrdev(fir_major,DEVICE_NAME);
printk("fir module exit!");
}
module_init(fir_init);
module_exit(fir_exit);
MODULE_AUTHOR("seg");
MODULE_LICENSE("GPL");
Makefile:
# Makefile for globalmm Driver
# Program start 2014.3.14
# 如果已经定义了KERNELRELEASE,则说明从内核构造系统调用的。
# 因此可以利用其内建语句
ifneq ($(KERNELRELEASE),)
obj-m := fir_driver.o
# 否则,要直接从命令行调用
# 这时要调用内核构造系统
else
KERNELDIR = /root/zybo_gpio/linux-digilent/
PWD := $(shell pwd)
default:
$(MAKE) -C $(KERNELDIR) M=$(PWD) modules ARCH=arm
clean:
rm -rf *.o *~ core .depend .*.cmd *.ko *.mod.c .tmp_versions modules.* Module.*
endif
3.2 test app:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
static int fir_fd;
int main(void)
{
int i;
long val;
// open device
fir_fd = open("/dev/fir_mod", 0);
if (fir_fd < 0) {
perror("open device gpio_dev error!
");
exit(1);
}
printf("fd = %d
",fir_fd);
printf("read fir devide successful
");
ioctl(fir_fd,1,0);
for(i=0;i<60;i++)
{
if(i==0)
{
val = 1;
ioctl(fir_fd,3,&val);
}
else
{
val = 0;
ioctl(fir_fd,3,&val);
printf("%d:%ld
",i,val);
}
}
//ioctl(fir_fd,3,val);
printf("done");
}
3.3 mapper.cpp
#include <iostream>
#include <stdio.h>
#include <string>
#include <fcntl.h>
#include <sys/ioctl.h>
using namespace std;
int fir_fd;
int main()
{
string str;
int i;
fir_fd = open("/dev/fir_mod", 0);
// printf("%d
",fir_fd);
while(cin>>str)
{
//the key is the original value
//cout << str << " ";
//firstly, init fir with 58 zero
char out_val[4];
// printf("debug");
ioctl(fir_fd,1,0);
//secondly, do the real fir
int len = str.length()/4;
for(i=0;i<len;i++)
{
str.copy(out_val+3,1,i*4+3);
str.copy(out_val+2,1,i*4+2);
str.copy(out_val+1,1,i*4+1);
str.copy(out_val+0,1,i*4+0);
ioctl(fir_fd,3,(long*)out_val);
cout << out_val[0] << out_val[1]<<out_val[2] << out_val[
3];
}
//release sem
ioctl(fir_fd,5,0);
//the value is fir out, and follow a line finish
cout << endl;
}
return 0;
}
3.4 reducer.cpp
#include <iostream>
#include <string>
//#include <map>
using namespace std;
/*
int main() {
map<string,int> firMap;
map<string,int>::iterator it;
string key;
int value;
int count;
while(cin>>key>>value) {
firMap[key] +=value;
}
for(it=firMap.begin();it != firMap.end();it++) {
cout<<it->first<<" "<<it->second<<endl;
}
return 0;
}
*/
int main()
{
string str;
while(cin >> str)
cout << str;
return 0;
}
3.5 test scripts:
#! /bin/sh -
#indir=/in
indir=/fir/signal100.dat
outdir=/out/hw100v3
cd /root/
if [ `ls /dev/ | grep fir_mod` -eq `echo` ]
then
./install_fir.sh
fi
echo fir run successful!
cd /root/hadoop-2.4.0/
bin/hadoop jar hadoop-streaming-2.4.0.jar
-D mapred.map.tasks=1
-D mapred.reduce.tasks=1
-file /root/hw_fir/Mapper -mapper /root/hw_fir/Mapper
-file /root/hw_fir/Reducer -reducer /root/hw_fir/Reducer
-input $indir -output $outdir
echo test now finished.
done






