zoukankan      html  css  js  c++  java
  • hadoop下的Kmeans算法实现

    转自:

    hadoop下的Kmeans算法实现一

    前一段时间,从配置hadoop到运行kmeans的mapreduce程序,着实让我纠结了几天,昨天终于把前面遇到的配置问题和程序运行问题搞定。Kmeans算法看起来很简单,但对于第一次接触mapreduce程序来说,还是有些挑战,还好基本都搞明白了。Kmeans算法是从网上下的在此分析一下过程。

    Kmeans.java

    [java] view plaincopy
    1. import org.apache.hadoop.conf.Configuration;  
    2. import org.apache.hadoop.fs.FileSystem;  
    3. import org.apache.hadoop.fs.Path;  
    4. import org.apache.hadoop.io.Text;  
    5. import org.apache.hadoop.mapreduce.Job;  
    6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    8.   
    9. public class KMeans {  
    10.       
    11.     public static void main(String[] args) throws Exception  
    12.     {  
    13.         CenterInitial centerInitial = new CenterInitial();  
    14.         centerInitial.run(args);//初始化中心点  
    15.         int times=0;  
    16.         double s = 0,shold = 0.1;//shold是预制。  
    17.         do {  
    18.             Configuration conf = new Configuration();  
    19.             conf.set("fs.default.name""hdfs://localhost:9000");  
    20.             Job job = new Job(conf,"KMeans");//建立KMeans的MapReduce作业  
    21.             job.setJarByClass(KMeans.class);//设定作业的启动类  
    22.             job.setOutputKeyClass(Text.class);//设定Key输出的格式:Text  
    23.             job.setOutputValueClass(Text.class);//设定value输出的格式:Text  
    24.             job.setMapperClass(KMapper.class);//设定Mapper类  
    25.             job.setMapOutputKeyClass(Text.class);  
    26.             job.setMapOutputValueClass(Text.class);//设定Reducer类  
    27.             job.setReducerClass(KReducer.class);  
    28.             FileSystem fs = FileSystem.get(conf);  
    29.             fs.delete(new Path(args[2]),true);//args[2]是output目录,fs.delete是将已存在的output删除  
    30.                         //解析输入和输出参数,分别作为作业的输入和输出,都是文件   
    31.                         FileInputFormat.addInputPath(job, new Path(args[0]));  
    32.             FileOutputFormat.setOutputPath(job, new Path(args[2]));  
    33.                         //运行作业并判断是否完成成功  
    34.                         job.waitForCompletion(true);  
    35.             if(job.waitForCompletion(true))//上一次mapreduce过程结束  
    36.             {  
    37.                                 //上两个中心点做比较,如果中心点之间的距离小于阈值就停止;如果距离大于阈值,就把最近的中心点作为新中心点  
    38.                                 NewCenter newCenter = new NewCenter();  
    39.                 s = newCenter.run(args);  
    40.                 times++;  
    41.             }  
    42.         } while(s > shold);//当误差小于阈值停止。  
    43.         System.out.println("Iterator: " + times);//迭代次数       
    44.     }  
    45.   
    46. }  
    问题:args[]是什么,这个问题纠结了几日才得到答案,args[]就是最开始向程序中传递的参数,具体在Run Configurations里配置,如下

    hdfs://localhost:9000/home/administrator/hadoop/kmeans/input hdfs://localhost:9000/home/administrator/hadoop/kmeans hdfs://localhost:9000/home/administrator/hadoop/kmeans/output

    代码的功能在程序中注释。

    hadoop下的Kmeans算法实现二

    输入数据,保存在2.txt中:(1,1) (9,9) (2,3) (10,30) (4,4) (34,40) (5,6) (15,20)

    3.txt用于保存临时的中心

    part-r-00000用于保存reduce的结果

    程序的mapreduce过程及结果:

    [java] view plaincopy
    1. 初始化过程:(10,30) (2,3)   
    2. 13/01/26 08:58:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
    3. 13/01/26 08:58:38 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
    4. 13/01/26 08:58:38 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).  
    5. 13/01/26 08:58:38 INFO input.FileInputFormat: Total input paths to process : 2  
    6. 13/01/26 08:58:38 WARN snappy.LoadSnappy: Snappy native library not loaded  
    7. 13/01/26 08:58:38 INFO mapred.JobClient: Running job: job_local_0001  
    8. 13/01/26 08:58:39 INFO util.ProcessTree: setsid exited with exit code 0  
    9. 13/01/26 08:58:39 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@15718f2  
    10. 13/01/26 08:58:39 INFO mapred.MapTask: io.sort.mb = 100  
    11. 13/01/26 08:58:39 INFO mapred.MapTask: data buffer = 79691776/99614720  
    12. 13/01/26 08:58:39 INFO mapred.MapTask: record buffer = 262144/327680  
    13. 0list:1  
    14. 0c:10  
    15. 1list:1  
    16. 1c:30  
    17. 中心点(2,3)对应坐标(1,1)  
    18. Mapper输出:(2,3) (1,1)  
    19. 0list:9  
    20. 0c:10  
    21. 1list:9  
    22. 1c:30  
    23. 中心点(2,3)对应坐标(9,9)  
    24. Mapper输出:(2,3) (9,9)  
    25. 0list:2  
    26. 0c:10  
    27. 1list:3  
    28. 1c:30  
    29. 中心点(2,3)对应坐标(2,3)  
    30. Mapper输出:(2,3) (2,3)  
    31. 0list:10  
    32. 0c:10  
    33. 1list:30  
    34. 1c:30  
    35. 中心点(10,30)对应坐标(10,30)  
    36. Mapper输出:(10,30) (10,30)  
    37. 0list:4  
    38. 0c:10  
    39. 1list:4  
    40. 1c:30  
    41. 中心点(2,3)对应坐标(4,4)  
    42. Mapper输出:(2,3) (4,4)  
    43. 0list:34  
    44. 0c:10  
    45. 1list:40  
    46. 1c:30  
    47. 中心点(10,30)对应坐标(34,40)  
    48. Mapper输出:(10,30) (34,40)  
    49. 0list:5  
    50. 0c:10  
    51. 1list:6  
    52. 1c:30  
    53. 中心点(2,3)对应坐标(5,6)  
    54. Mapper输出:(2,3) (5,6)  
    55. 0list:15  
    56. 0c:10  
    57. 1list:20  
    58. 1c:30  
    59. 中心点(10,30)对应坐标(15,20)  
    60. Mapper输出:(10,30) (15,20)  
    61. 13/01/26 08:58:39 INFO mapred.MapTask: Starting flush of map output  
    62. 13/01/26 08:58:39 INFO mapred.MapTask: Finished spill 0  
    63. 13/01/26 08:58:39 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting  
    64. 13/01/26 08:58:39 INFO mapred.JobClient:  map 0% reduce 0%  
    65. 13/01/26 08:58:42 INFO mapred.LocalJobRunner:   
    66. 13/01/26 08:58:42 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.  
    67. 13/01/26 08:58:42 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@77eaf8  
    68. 13/01/26 08:58:42 INFO mapred.MapTask: io.sort.mb = 100  
    69. 13/01/26 08:58:42 INFO mapred.MapTask: data buffer = 79691776/99614720  
    70. 13/01/26 08:58:42 INFO mapred.MapTask: record buffer = 262144/327680  
    71. 0list:2  
    72. 0c:10  
    73. 1list:3  
    74. 1c:30  
    75. 中心点(2,3)对应坐标(2,3)  
    76. Mapper输出:(2,3) (2,3)  
    77. 0list:10  
    78. 0c:10  
    79. 1list:30  
    80. 1c:30  
    81. 中心点(10,30)对应坐标(10,30)  
    82. Mapper输出:(10,30) (10,30)  
    83. 0list:34  
    84. 0c:10  
    85. 1list:40  
    86. 1c:30  
    87. 中心点(10,30)对应坐标(34,40)  
    88. Mapper输出:(10,30) (34,40)  
    89. 0list:1  
    90. 0c:10  
    91. 1list:1  
    92. 1c:30  
    93. 中心点(2,3)对应坐标(1,1)  
    94. Mapper输出:(2,3) (1,1)  
    95. 13/01/26 08:58:42 INFO mapred.MapTask: Starting flush of map output  
    96. 13/01/26 08:58:42 INFO mapred.MapTask: Finished spill 0  
    97. 13/01/26 08:58:42 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting  
    98. 13/01/26 08:58:42 INFO mapred.JobClient:  map 100% reduce 0%  
    99. 13/01/26 08:58:45 INFO mapred.LocalJobRunner:   
    100. 13/01/26 08:58:45 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.  
    101. 13/01/26 08:58:45 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@18d7ace  
    102. 13/01/26 08:58:45 INFO mapred.LocalJobRunner:   
    103. 13/01/26 08:58:45 INFO mapred.Merger: Merging 2 sorted segments  
    104. 13/01/26 08:58:45 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 192 bytes  
    105. 13/01/26 08:58:45 INFO mapred.LocalJobRunner:   
    106. Reduce过程第一次  
    107. (10,30)Reduce  
    108. val:(10,30)  
    109. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    110. temlength:2  
    111. val:(34,40)  
    112. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    113. temlength:2  
    114. val:(10,30)  
    115. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    116. temlength:2  
    117. val:(34,40)  
    118. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    119. temlength:2  
    120. val:(15,20)  
    121. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    122. temlength:2  
    123. count:5  
    124. outVal:(10,30) (34,40) (10,30) (34,40) (15,20) /outVal  
    125. ave0i103.0  
    126. ave1i160.0  
    127. 写入part:(10,30) (10,30) (34,40) (10,30) (34,40) (15,20)  (20.6,32.0)  
    128. Reduce过程第一次  
    129. (2,3)Reduce  
    130. val:(1,1)  
    131. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    132. temlength:2  
    133. val:(9,9)  
    134. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    135. temlength:2  
    136. val:(2,3)  
    137. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    138. temlength:2  
    139. val:(4,4)  
    140. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    141. temlength:2  
    142. val:(5,6)  
    143. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    144. temlength:2  
    145. val:(2,3)  
    146. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    147. temlength:2  
    148. val:(1,1)  
    149. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@141fab6  
    150. temlength:2  
    151. count:7  
    152. outVal:(1,1) (9,9) (2,3) (4,4) (5,6) (2,3) (1,1) /outVal  
    153. ave0i24.0  
    154. ave1i27.0  
    155. 写入part:(2,3) (1,1) (9,9) (2,3) (4,4) (5,6) (2,3) (1,1)  (3.4285715,3.857143)  
    156. 13/01/26 08:58:45 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting  
    157. 13/01/26 08:58:45 INFO mapred.LocalJobRunner:   
    158. 13/01/26 08:58:45 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now  
    159. 13/01/26 08:58:45 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9000/home/administrator/hadoop/kmeans/output  
    160. 13/01/26 08:58:48 INFO mapred.LocalJobRunner: reduce > reduce  
    161. 13/01/26 08:58:48 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.  
    162. 13/01/26 08:58:48 INFO mapred.JobClient:  map 100% reduce 100%  
    163. 13/01/26 08:58:48 INFO mapred.JobClient: Job complete: job_local_0001  
    164. 13/01/26 08:58:48 INFO mapred.JobClient: Counters: 22  
    165. 13/01/26 08:58:48 INFO mapred.JobClient:   File Output Format Counters   
    166. 13/01/26 08:58:48 INFO mapred.JobClient:     Bytes Written=129  
    167. 13/01/26 08:58:48 INFO mapred.JobClient:   FileSystemCounters  
    168. 13/01/26 08:58:48 INFO mapred.JobClient:     FILE_BYTES_READ=1818  
    169. 13/01/26 08:58:48 INFO mapred.JobClient:     HDFS_BYTES_READ=450  
    170. 13/01/26 08:58:48 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=122901  
    171. 13/01/26 08:58:48 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=171  
    172. 13/01/26 08:58:48 INFO mapred.JobClient:   File Input Format Counters   
    173. 13/01/26 08:58:48 INFO mapred.JobClient:     Bytes Read=82  
    174. 13/01/26 08:58:48 INFO mapred.JobClient:   Map-Reduce Framework  
    175. 13/01/26 08:58:48 INFO mapred.JobClient:     Map output materialized bytes=200  
    176. 13/01/26 08:58:48 INFO mapred.JobClient:     Map input records=2  
    177. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce shuffle bytes=0  
    178. 13/01/26 08:58:48 INFO mapred.JobClient:     Spilled Records=24  
    179. 13/01/26 08:58:48 INFO mapred.JobClient:     Map output bytes=164  
    180. 13/01/26 08:58:48 INFO mapred.JobClient:     Total committed heap usage (bytes)=498860032  
    181. 13/01/26 08:58:48 INFO mapred.JobClient:     CPU time spent (ms)=0  
    182. 13/01/26 08:58:48 INFO mapred.JobClient:     SPLIT_RAW_BYTES=262  
    183. 13/01/26 08:58:48 INFO mapred.JobClient:     Combine input records=0  
    184. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce input records=12  
    185. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce input groups=2  
    186. 13/01/26 08:58:48 INFO mapred.JobClient:     Combine output records=0  
    187. 13/01/26 08:58:48 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0  
    188. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce output records=2  
    189. 13/01/26 08:58:48 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0  
    190. 13/01/26 08:58:48 INFO mapred.JobClient:     Map output records=12  
    191. 13/01/26 08:58:48 INFO mapred.JobClient: Running job: job_local_0001  
    192. 13/01/26 08:58:48 INFO mapred.JobClient: Job complete: job_local_0001  
    193. 13/01/26 08:58:48 INFO mapred.JobClient: Counters: 22  
    194. 13/01/26 08:58:48 INFO mapred.JobClient:   File Output Format Counters   
    195. 13/01/26 08:58:48 INFO mapred.JobClient:     Bytes Written=129  
    196. 13/01/26 08:58:48 INFO mapred.JobClient:   FileSystemCounters  
    197. 13/01/26 08:58:48 INFO mapred.JobClient:     FILE_BYTES_READ=1818  
    198. 13/01/26 08:58:48 INFO mapred.JobClient:     HDFS_BYTES_READ=450  
    199. 13/01/26 08:58:48 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=122901  
    200. 13/01/26 08:58:48 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=171  
    201. 13/01/26 08:58:48 INFO mapred.JobClient:   File Input Format Counters   
    202. 13/01/26 08:58:48 INFO mapred.JobClient:     Bytes Read=82  
    203. 13/01/26 08:58:48 INFO mapred.JobClient:   Map-Reduce Framework  
    204. 13/01/26 08:58:48 INFO mapred.JobClient:     Map output materialized bytes=200  
    205. 13/01/26 08:58:48 INFO mapred.JobClient:     Map input records=2  
    206. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce shuffle bytes=0  
    207. 13/01/26 08:58:48 INFO mapred.JobClient:     Spilled Records=24  
    208. 13/01/26 08:58:48 INFO mapred.JobClient:     Map output bytes=164  
    209. 13/01/26 08:58:48 INFO mapred.JobClient:     Total committed heap usage (bytes)=498860032  
    210. 13/01/26 08:58:48 INFO mapred.JobClient:     CPU time spent (ms)=0  
    211. 13/01/26 08:58:48 INFO mapred.JobClient:     SPLIT_RAW_BYTES=262  
    212. 13/01/26 08:58:48 INFO mapred.JobClient:     Combine input records=0  
    213. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce input records=12  
    214. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce input groups=2  
    215. 13/01/26 08:58:48 INFO mapred.JobClient:     Combine output records=0  
    216. 13/01/26 08:58:48 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0  
    217. 13/01/26 08:58:48 INFO mapred.JobClient:     Reduce output records=2  
    218. 13/01/26 08:58:48 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0  
    219. 13/01/26 08:58:48 INFO mapred.JobClient:     Map output records=12  
    220. 上一次MapReduce结果:第一行:(10,30)  (10,30) (34,40) (10,30) (34,40) (15,20) (20.6,32.0)  
    221. 第二行:(2,3)   (1,1) (9,9) (2,3) (4,4) (5,6) (2,3) (1,1) (3.4285715,3.857143)  
    222. 。  
    223. 0坐标距离:116.36001  
    224. 1坐标距离:2.7755103  
    225. 新中心点:(20.6,32.0) (3.4285715,3.857143)   
    226. 13/01/26 08:58:49 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
    227. 13/01/26 08:58:49 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).  
    228. 13/01/26 08:58:49 INFO input.FileInputFormat: Total input paths to process : 2  
    229. 13/01/26 08:58:49 INFO mapred.JobClient: Running job: job_local_0002  
    230. 13/01/26 08:58:49 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@18aab40  
    231. 13/01/26 08:58:49 INFO mapred.MapTask: io.sort.mb = 100  
    232. 13/01/26 08:58:49 INFO mapred.MapTask: data buffer = 79691776/99614720  
    233. 13/01/26 08:58:49 INFO mapred.MapTask: record buffer = 262144/327680  
    234. 0list:1  
    235. 0c:20.6  
    236. 1list:1  
    237. 1c:32.0  
    238. 中心点(3.4285715,3.857143)对应坐标(1,1)  
    239. Mapper输出:(3.4285715,3.857143) (1,1)  
    240. 0list:9  
    241. 0c:20.6  
    242. 1list:9  
    243. 1c:32.0  
    244. 中心点(3.4285715,3.857143)对应坐标(9,9)  
    245. Mapper输出:(3.4285715,3.857143) (9,9)  
    246. 0list:2  
    247. 0c:20.6  
    248. 1list:3  
    249. 1c:32.0  
    250. 中心点(3.4285715,3.857143)对应坐标(2,3)  
    251. Mapper输出:(3.4285715,3.857143) (2,3)  
    252. 0list:10  
    253. 0c:20.6  
    254. 1list:30  
    255. 1c:32.0  
    256. 中心点(20.6,32.0)对应坐标(10,30)  
    257. Mapper输出:(20.6,32.0) (10,30)  
    258. 0list:4  
    259. 0c:20.6  
    260. 1list:4  
    261. 1c:32.0  
    262. 中心点(3.4285715,3.857143)对应坐标(4,4)  
    263. Mapper输出:(3.4285715,3.857143) (4,4)  
    264. 0list:34  
    265. 0c:20.6  
    266. 1list:40  
    267. 1c:32.0  
    268. 中心点(20.6,32.0)对应坐标(34,40)  
    269. Mapper输出:(20.6,32.0) (34,40)  
    270. 0list:5  
    271. 0c:20.6  
    272. 1list:6  
    273. 1c:32.0  
    274. 中心点(3.4285715,3.857143)对应坐标(5,6)  
    275. Mapper输出:(3.4285715,3.857143) (5,6)  
    276. 0list:15  
    277. 0c:20.6  
    278. 1list:20  
    279. 1c:32.0  
    280. 中心点(20.6,32.0)对应坐标(15,20)  
    281. Mapper输出:(20.6,32.0) (15,20)  
    282. 13/01/26 08:58:49 INFO mapred.MapTask: Starting flush of map output  
    283. 13/01/26 08:58:49 INFO mapred.MapTask: Finished spill 0  
    284. 13/01/26 08:58:49 INFO mapred.Task: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting  
    285. 13/01/26 08:58:50 INFO mapred.JobClient:  map 0% reduce 0%  
    286. 13/01/26 08:58:52 INFO mapred.LocalJobRunner:   
    287. 13/01/26 08:58:52 INFO mapred.Task: Task 'attempt_local_0002_m_000000_0' done.  
    288. 13/01/26 08:58:52 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@147358f  
    289. 13/01/26 08:58:52 INFO mapred.MapTask: io.sort.mb = 100  
    290. 13/01/26 08:58:52 INFO mapred.MapTask: data buffer = 79691776/99614720  
    291. 13/01/26 08:58:52 INFO mapred.MapTask: record buffer = 262144/327680  
    292. 0list:2  
    293. 0c:20.6  
    294. 1list:3  
    295. 1c:32.0  
    296. 中心点(3.4285715,3.857143)对应坐标(2,3)  
    297. Mapper输出:(3.4285715,3.857143) (2,3)  
    298. 0list:10  
    299. 0c:20.6  
    300. 1list:30  
    301. 1c:32.0  
    302. 中心点(20.6,32.0)对应坐标(10,30)  
    303. Mapper输出:(20.6,32.0) (10,30)  
    304. 0list:34  
    305. 0c:20.6  
    306. 1list:40  
    307. 1c:32.0  
    308. 中心点(20.6,32.0)对应坐标(34,40)  
    309. Mapper输出:(20.6,32.0) (34,40)  
    310. 0list:1  
    311. 0c:20.6  
    312. 1list:1  
    313. 1c:32.0  
    314. 中心点(3.4285715,3.857143)对应坐标(1,1)  
    315. Mapper输出:(3.4285715,3.857143) (1,1)  
    316. 13/01/26 08:58:52 INFO mapred.MapTask: Starting flush of map output  
    317. 13/01/26 08:58:52 INFO mapred.MapTask: Finished spill 0  
    318. 13/01/26 08:58:52 INFO mapred.Task: Task:attempt_local_0002_m_000001_0 is done. And is in the process of commiting  
    319. 13/01/26 08:58:53 INFO mapred.JobClient:  map 100% reduce 0%  
    320. 13/01/26 08:58:55 INFO mapred.LocalJobRunner:   
    321. 13/01/26 08:58:55 INFO mapred.Task: Task 'attempt_local_0002_m_000001_0' done.  
    322. 13/01/26 08:58:55 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2798e7  
    323. 13/01/26 08:58:55 INFO mapred.LocalJobRunner:   
    324. 13/01/26 08:58:55 INFO mapred.Merger: Merging 2 sorted segments  
    325. 13/01/26 08:58:55 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 317 bytes  
    326. 13/01/26 08:58:55 INFO mapred.LocalJobRunner:   
    327. Reduce过程第一次  
    328. (20.6,32.0)Reduce  
    329. val:(10,30)  
    330. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    331. temlength:2  
    332. val:(34,40)  
    333. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    334. temlength:2  
    335. val:(10,30)  
    336. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    337. temlength:2  
    338. val:(34,40)  
    339. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    340. temlength:2  
    341. val:(15,20)  
    342. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    343. temlength:2  
    344. count:5  
    345. outVal:(10,30) (34,40) (10,30) (34,40) (15,20) /outVal  
    346. ave0i103.0  
    347. ave1i160.0  
    348. 写入part:(20.6,32.0) (10,30) (34,40) (10,30) (34,40) (15,20)  (20.6,32.0)  
    349. Reduce过程第一次  
    350. (3.4285715,3.857143)Reduce  
    351. val:(1,1)  
    352. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    353. temlength:2  
    354. val:(9,9)  
    355. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    356. temlength:2  
    357. val:(2,3)  
    358. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    359. temlength:2  
    360. val:(4,4)  
    361. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    362. temlength:2  
    363. val:(5,6)  
    364. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    365. temlength:2  
    366. val:(2,3)  
    367. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    368. temlength:2  
    369. val:(1,1)  
    370. values:org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@13043d2  
    371. temlength:2  
    372. count:7  
    373. outVal:(1,1) (9,9) (2,3) (4,4) (5,6) (2,3) (1,1) /outVal  
    374. ave0i24.0  
    375. ave1i27.0  
    376. 写入part:(3.4285715,3.857143) (1,1) (9,9) (2,3) (4,4) (5,6) (2,3) (1,1)  (3.4285715,3.857143)  
    377. 13/01/26 08:58:55 INFO mapred.Task: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting  
    378. 13/01/26 08:58:55 INFO mapred.LocalJobRunner:   
    379. 13/01/26 08:58:55 INFO mapred.Task: Task attempt_local_0002_r_000000_0 is allowed to commit now  
    380. 13/01/26 08:58:55 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0002_r_000000_0' to hdfs://localhost:9000/home/administrator/hadoop/kmeans/output  
    381. 13/01/26 08:58:58 INFO mapred.LocalJobRunner: reduce > reduce  
    382. 13/01/26 08:58:58 INFO mapred.Task: Task 'attempt_local_0002_r_000000_0' done.  
    383. 13/01/26 08:58:59 INFO mapred.JobClient:  map 100% reduce 100%  
    384. 13/01/26 08:58:59 INFO mapred.JobClient: Job complete: job_local_0002  
    385. 13/01/26 08:58:59 INFO mapred.JobClient: Counters: 22  
    386. 13/01/26 08:58:59 INFO mapred.JobClient:   File Output Format Counters   
    387. 13/01/26 08:58:59 INFO mapred.JobClient:     Bytes Written=148  
    388. 13/01/26 08:58:59 INFO mapred.JobClient:   FileSystemCounters  
    389. 13/01/26 08:58:59 INFO mapred.JobClient:     FILE_BYTES_READ=4442  
    390. 13/01/26 08:58:59 INFO mapred.JobClient:     HDFS_BYTES_READ=1262  
    391. 13/01/26 08:58:59 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=246235  
    392. 13/01/26 08:58:59 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=676  
    393. 13/01/26 08:58:59 INFO mapred.JobClient:   File Input Format Counters   
    394. 13/01/26 08:58:59 INFO mapred.JobClient:     Bytes Read=82  
    395. 13/01/26 08:58:59 INFO mapred.JobClient:   Map-Reduce Framework  
    396. 13/01/26 08:58:59 INFO mapred.JobClient:     Map output materialized bytes=325  
    397. 13/01/26 08:58:59 INFO mapred.JobClient:     Map input records=2  
    398. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce shuffle bytes=0  
    399. 13/01/26 08:58:59 INFO mapred.JobClient:     Spilled Records=24  
    400. 13/01/26 08:58:59 INFO mapred.JobClient:     Map output bytes=289  
    401. 13/01/26 08:58:59 INFO mapred.JobClient:     Total committed heap usage (bytes)=667418624  
    402. 13/01/26 08:58:59 INFO mapred.JobClient:     CPU time spent (ms)=0  
    403. 13/01/26 08:58:59 INFO mapred.JobClient:     SPLIT_RAW_BYTES=262  
    404. 13/01/26 08:58:59 INFO mapred.JobClient:     Combine input records=0  
    405. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce input records=12  
    406. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce input groups=2  
    407. 13/01/26 08:58:59 INFO mapred.JobClient:     Combine output records=0  
    408. 13/01/26 08:58:59 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0  
    409. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce output records=2  
    410. 13/01/26 08:58:59 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0  
    411. 13/01/26 08:58:59 INFO mapred.JobClient:     Map output records=12  
    412. 13/01/26 08:58:59 INFO mapred.JobClient: Running job: job_local_0002  
    413. 13/01/26 08:58:59 INFO mapred.JobClient: Job complete: job_local_0002  
    414. 13/01/26 08:58:59 INFO mapred.JobClient: Counters: 22  
    415. 13/01/26 08:58:59 INFO mapred.JobClient:   File Output Format Counters   
    416. 13/01/26 08:58:59 INFO mapred.JobClient:     Bytes Written=148  
    417. 13/01/26 08:58:59 INFO mapred.JobClient:   FileSystemCounters  
    418. 13/01/26 08:58:59 INFO mapred.JobClient:     FILE_BYTES_READ=4442  
    419. 13/01/26 08:58:59 INFO mapred.JobClient:     HDFS_BYTES_READ=1262  
    420. 13/01/26 08:58:59 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=246235  
    421. 13/01/26 08:58:59 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=676  
    422. 13/01/26 08:58:59 INFO mapred.JobClient:   File Input Format Counters   
    423. 13/01/26 08:58:59 INFO mapred.JobClient:     Bytes Read=82  
    424. 13/01/26 08:58:59 INFO mapred.JobClient:   Map-Reduce Framework  
    425. 13/01/26 08:58:59 INFO mapred.JobClient:     Map output materialized bytes=325  
    426. 13/01/26 08:58:59 INFO mapred.JobClient:     Map input records=2  
    427. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce shuffle bytes=0  
    428. 13/01/26 08:58:59 INFO mapred.JobClient:     Spilled Records=24  
    429. 13/01/26 08:58:59 INFO mapred.JobClient:     Map output bytes=289  
    430. 13/01/26 08:58:59 INFO mapred.JobClient:     Total committed heap usage (bytes)=667418624  
    431. 13/01/26 08:58:59 INFO mapred.JobClient:     CPU time spent (ms)=0  
    432. 13/01/26 08:58:59 INFO mapred.JobClient:     SPLIT_RAW_BYTES=262  
    433. 13/01/26 08:58:59 INFO mapred.JobClient:     Combine input records=0  
    434. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce input records=12  
    435. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce input groups=2  
    436. 13/01/26 08:58:59 INFO mapred.JobClient:     Combine output records=0  
    437. 13/01/26 08:58:59 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0  
    438. 13/01/26 08:58:59 INFO mapred.JobClient:     Reduce output records=2  
    439. 13/01/26 08:58:59 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0  
    440. 13/01/26 08:58:59 INFO mapred.JobClient:     Map output records=12  
    441. 上一次MapReduce结果:第一行:(20.6,32.0)  (10,30) (34,40) (10,30) (34,40) (15,20) (20.6,32.0)  
    442. 第二行:(3.4285715,3.857143)    (1,1) (9,9) (2,3) (4,4) (5,6) (2,3) (1,1) (3.4285715,3.857143)  
    443. 。  
    444. 0坐标距离:0.0  
    445. 1坐标距离:0.0  
    446. 新中心点:(20.6,32.0) (3.4285715,3.857143)   
    447. Iterator: 2  

    hadoop下的Kmeans算法实现三

    初始化中心点CenterInitial.java

    [java] view plaincopy
    1. import java.io.ByteArrayInputStream;  
    2. import java.io.ByteArrayOutputStream;  
    3. import java.io.IOException;  
    4. import java.io.OutputStream;  
    5. import java.net.URI;  
    6.   
    7. import org.apache.hadoop.conf.Configuration;  
    8. import org.apache.hadoop.fs.FSDataInputStream;  
    9. import org.apache.hadoop.fs.FileSystem;  
    10. import org.apache.hadoop.fs.Path;  
    11. import org.apache.hadoop.io.IOUtils;  
    12.   
    13.   
    14. public class CenterInitial {  
    15.       
    16.       
    17.     public void run(String[] args) throws IOException  
    18.     {  
    19.         String[] clist;//用于保存中心点  
    20.         int k = 2;//中心点选取个数  
    21.         String string = "";//保存各个中心点在同一个字符串string中  
    22.         String inpath = args[0]+"/4.txt";  //cluster数据集放在2.txt中  
    23.         String outpath = args[1]+"/input2/3.txt";  //center新选取的中心点放进3.txt中保存  
    24.         Configuration conf1 = new Configuration(); //读取hadoop文件系统的配置  
    25.         conf1.set("hadoop.job.ugi""hadoop,hadoop"); //配置信息设置  
    26.         FileSystem fs = FileSystem.get(URI.create(inpath),conf1); //FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统   
    27.         FSDataInputStream in = null;   
    28.         ByteArrayOutputStream out = new ByteArrayOutputStream();  
    29.         try{   
    30.            
    31.             in = fs.open( new Path(inpath) );   
    32.             IOUtils.copyBytes(in,out,50,false);  //用Hadoop的IOUtils工具方法来让这个文件的指定字节复制到标准输出流上   
    33.             //把in读到的数据 复制到out上  
    34.             clist = out.toString().split(" ");//将out以空格为分割符转换成数组在clist中保存  
    35.             } finally {   
    36.                 IOUtils.closeStream(in);  
    37.             }  
    38.           
    39.         FileSystem filesystem = FileSystem.get(URI.create(outpath), conf1); //获得URI对应的HDFS文件系统   
    40.           
    41.         for(int i=0;i<k;i++)  
    42.         {  
    43.             int j=(int) (Math.random()*100) % clist.length;//选取0到clist.lenth-1的随机数  
    44.             if(string.contains(clist[j]))  // 如果选取的是同一个随机数  
    45.             {  
    46.                 k++;  
    47.                 continue;  
    48.             }  
    49.             string = string + clist[j].replace(" """) + " ";//将得到的k个随机点的坐标用一个字符串保存  
    50.         }  
    51.         OutputStream out2 = filesystem.create(new Path(outpath) );   
    52.         IOUtils.copyBytes(new ByteArrayInputStream(string.getBytes()), out2, 4096,true); //把随机点坐标字符串out2中  
    53.         System.out.println("初始化过程:"+string);  
    54.     }  
    55.   
    56. }  
    hadoop下kmeans算法实现四

    KMapper.java

    [java] view plaincopy
    1. import java.io.ByteArrayOutputStream;  
    2. import java.io.IOException;  
    3. import java.net.URI;  
    4. import java.util.StringTokenizer;  
    5.   
    6. import org.apache.hadoop.conf.Configuration;  
    7. import org.apache.hadoop.fs.FSDataInputStream;  
    8. import org.apache.hadoop.fs.FileSystem;  
    9. import org.apache.hadoop.fs.Path;  
    10. import org.apache.hadoop.io.IOUtils;  
    11. import org.apache.hadoop.io.LongWritable;  
    12. import org.apache.hadoop.io.Text;  
    13. import org.apache.hadoop.mapreduce.Mapper;  
    14.   
    15.   
    16. public class KMapper extends Mapper<LongWritable, Text, Text, Text> {  
    17.       
    18.     private String[] center;  
    19.     //读取3.txt中更新的中心点坐标,并将坐标存入center数组中  
    20.     protected void setup(Context context) throws IOException,InterruptedException  //read centerlist, and save to center[]  
    21.     {  
    22.         String centerlist = "hdfs://localhost:9000/home/administrator/hadoop/kmeans/input2/3.txt"; //center文件  
    23.         Configuration conf1 = new Configuration();  
    24.         conf1.set("hadoop.job.ugi""hadoop-user,hadoop-user");  
    25.        FileSystem fs = FileSystem.get(URI.create(centerlist),conf1);  
    26.        FSDataInputStream in = null;  
    27.        ByteArrayOutputStream out = new ByteArrayOutputStream();  
    28.        try{  
    29.                
    30.            in = fs.open( new Path(centerlist) );  
    31.            IOUtils.copyBytes(in,out,100,false);    
    32.            center = out.toString().split(" ");  
    33.            }finally{  
    34.                 IOUtils.closeStream(in);  
    35.             }  
    36.     }  
    37.     //从hadoop接收的数据在2.txt中保存  
    38.     public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException  
    39.     {  
    40.         StringTokenizer itr = new StringTokenizer(value.toString());  
    41.         //从2.txt读入数据,以空格为分割符,一个一个处理  
    42.         while(itr.hasMoreTokens())//用于判断所要分析的字符串中,是否还有语言符号,如果有则返回true,反之返回false  
    43.         {  
    44.               
    45.             //计算第一个坐标跟第一个中心的距离min  
    46.             String outValue = new String(itr.nextToken());//逐个获取以空格为分割符的字符串(2,3) (10,30) (34,40) (1,1)  
    47.             String[] list = outValue.replace("(""").replace(")""").split(",");  
    48.             String[] c = center[0].replace("(""").replace(")""").split(",");  
    49.             float min = 0;  
    50.             int pos = 0;  
    51.             for(int i=0;i<list.length;i++)  
    52.             {  
    53.                 System.out.println(i+"list:"+list[i]);  
    54.                 System.out.println(i+"c:"+c[i]);  
    55.                 min += (float) Math.pow((Float.parseFloat(list[i]) - Float.parseFloat(c[i])),2);//求欧式距离,为加根号  
    56.             }  
    57.               
    58.               
    59.             for(int i=0;i<center.length;i++)  
    60.             {  
    61.                 String[] centerStrings = center[i].replace("(""").replace(")""").split(",");  
    62.                 float distance = 0;  
    63.                 for(int j=0;j<list.length;j++)  
    64.                     distance += (float) Math.pow((Float.parseFloat(list[j]) - Float.parseFloat(centerStrings[j])),2);  
    65.                 if(min>distance)  
    66.                 {  
    67.                     min=distance;  
    68.                     pos=i;  
    69.                 }  
    70.             }  
    71.             context.write(new Text(center[pos]), new Text(outValue));//输出:中心点,对应的坐标  
    72.             System.out.println("中心点"+center[pos]+"对应坐标"+outValue);  
    73.             System.out.println("Mapper输出:"+center[pos]+" "+outValue);  
    74.         }  
    75.     }  
    76.   
    77. }  


    KReduce.java

    [java] view plaincopy
    1. import java.io.IOException;  
    2.   
    3. import org.apache.hadoop.io.Text;  
    4. import org.apache.hadoop.mapreduce.Reducer;  
    5.   
    6.   
    7. public class KReducer extends Reducer<Text, Text, Text, Text> {  
    8.     //<中心点类别,中心点对应的坐标集合>,每个中心点类别的坐标集合求新的中心点  
    9.       
    10.     public void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException  
    11.     {  
    12.         String outVal = "";  
    13.         int count=0;  
    14.         String center="";  
    15.         System.out.println("Reduce过程第一次");  
    16.         System.out.println(key.toString()+"Reduce");  
    17.         int length = key.toString().replace("(""").replace(")""").replace(":""").split(",").length;  
    18.         float[] ave = new float[Float.SIZE*length];  
    19.         for(int i=0;i<length;i++)  
    20.             ave[i]=0;   
    21.         for(Text val:value)  
    22.         {  
    23.             System.out.println("val:"+val.toString());  
    24.             System.out.println("values:"+value.toString());  
    25.             outVal += val.toString()+" ";  
    26.             String[] tmp = val.toString().replace("(""").replace(")""").split(",");  
    27.             System.out.println("temlength:"+tmp.length);  
    28.             for(int i=0;i<tmp.length;i++)  
    29.                 ave[i] += Float.parseFloat(tmp[i]);  
    30.             count ++;  
    31.         }  
    32.         System.out.println("count:"+count);  
    33.         System.out.println("outVal:"+outVal+"/outVal");  
    34.         for (int i=0;i<2;i++)  
    35.         {  
    36.             System.out.println("ave"+i+"i"+ave[i]);  
    37.         }  
    38.         //ave[0]存储X坐标之和,ave[1]存储Y坐标之和  
    39.         for(int i=0;i<length;i++)  
    40.         {  
    41.             ave[i]=ave[i]/count;  
    42.             if(i==0)  
    43.                 center += "("+ave[i]+",";  
    44.             else {  
    45.                 if(i==length-1)  
    46.                     center += ave[i]+")";  
    47.                 else {  
    48.                     center += ave[i]+",";  
    49.                 }  
    50.             }  
    51.         }  
    52.         System.out.println("写入part:"+key+" "+outVal+" "+center);  
    53.         context.write(key, new Text(outVal+center));  
    54.     }  
    55.   
    56. }  

    NewCenter.java

    [java] view plaincopy
    1. import java.io.ByteArrayInputStream;  
    2. import java.io.ByteArrayOutputStream;  
    3. import java.io.IOException;  
    4. import java.io.OutputStream;  
    5. import java.net.URI;  
    6.   
    7. import org.apache.hadoop.conf.Configuration;  
    8. import org.apache.hadoop.fs.FSDataInputStream;  
    9. import org.apache.hadoop.fs.FileSystem;  
    10. import org.apache.hadoop.fs.Path;  
    11. import org.apache.hadoop.io.IOUtils;  
    12.   
    13.   
    14. public class NewCenter {  
    15.       
    16.     int k = 2;  
    17.     float shold=Integer.MIN_VALUE;  
    18.     String[] line;  
    19.     String newcenter = new String("");  
    20.       
    21.     public float run(String[] args) throws IOException,InterruptedException  
    22.     {  
    23.         Configuration conf = new Configuration();  
    24.         conf.set("hadoop.job.ugi""hadoop,hadoop");   
    25.         FileSystem fs = FileSystem.get(URI.create(args[2]+"/part-r-00000"),conf);  
    26.         FSDataInputStream in = null;  
    27.         ByteArrayOutputStream out = new ByteArrayOutputStream();  
    28.         try{   
    29.             in = fs.open( new Path(args[2]+"/part-r-00000"));   
    30.             IOUtils.copyBytes(in,out,50,false);  
    31.             line = out.toString().split(" ");  
    32.             } finally {   
    33.                 IOUtils.closeStream(in);  
    34.             }  
    35.       
    36.         //System.out.println("上一次的MapReduce结果:"+out.toString());  
    37.         System.out.println("上一次MapReduce结果:第一行:"+line[0]);  
    38.         System.out.println("第二行:"+line[1]);  
    39.         System.out.println("。");  
    40.         for(int i=0;i<k;i++)  
    41.         {  
    42.             String[] l = line[i].replace(" "" ").split(" ");//如果这行有tab的空格,可以替代为空格  
    43.             //(key,values)key和values同时输出是,中间保留一个Tab的距离,即' '  
    44.             String[] startCenter = l[0].replace("(""").replace(")""").split(",");  
    45.             //上上次的中心点startCenter[0]=(10,30);startCenter[1]=(2,3);  
    46.             String[] finalCenter = l[l.length-1].replace("(""").replace(")""").split(",");  
    47.             //上一次的中心点finalCenter[0]=(22,35);finalCenter[1]=(1.5,2.0);  
    48.             float tmp = 0;  
    49.             for(int j=0;j<startCenter.length;j++)  
    50.                 tmp += Math.pow(Float.parseFloat(startCenter[j])-Float.parseFloat(finalCenter[j]), 2);  
    51.             //两个中心点间的欧式距离的平方  
    52.             newcenter = newcenter + l[l.length - 1].replace(" """) + " ";  
    53.             if(shold <= tmp)  
    54.                 shold = tmp;  
    55.             System.out.println(i+"坐标距离:"+tmp);  
    56.         }  
    57.         System.out.println("新中心点:"+newcenter);  
    58.         OutputStream out2 = fs.create(new Path(args[1]+"/center/3.txt") );   
    59.         IOUtils.copyBytes(new ByteArrayInputStream(newcenter.getBytes()), out2, 4096,true);  
    60.         //System.out.println(newcenter);  
    61.         return shold;  
    62.         //return 0;  
    63.     }  
    64.   
    65. }  

  • 相关阅读:
    【原创】简单快速软件开发平台,C/S架构二次开发平台
    【原创】进销存快速开发框架 (Winform三层架构+DevExpress+MsSQL)
    MES软件开发工具
    Winform C/S架构TMS物流运输管理系统司机车辆GPS+手机APP定位参考设计
    C#权限管理框架介绍|C/S系统快速开发框架权限系统设计
    C# Winform程序调用WebApi接口实现增删改查(CRUD)实例源码教程
    ASP.NETWebApi实例教程:如何部署和发布WebApi到IIS服务器详解
    Web后端开发框架|WebApi后端主流开发框架介绍
    Asp.Net开源服务端框架,WebApi后端框架(C#.NET)
    Winform布局开源框架,Winform控件框架,插件化框架
  • 原文地址:https://www.cnblogs.com/walccott/p/4956975.html
Copyright © 2011-2022 走看看