Decision Tree such as C4.5 is easy to parallel. Following is an example.
This is a non-parallel version:
public void learnFromDataSet(Iterable<Sample<FK, FV, Boolean>> dataset){ for(Sample sample : dataset){ model.addSample((MapBasedBinarySample<FK, FV>)sample); } Queue<TreeNode<FK, FV>> Q = new LinkedList<TreeNode<FK, FV>>(); TreeNode<FK, FV> root = model.selectRootTreeNode(); model.addTreeNode(root); Q.add(root); while (!Q.isEmpty()){ TreeNode v = Q.poll(); if(v.getDepth() >= model.getMaxDepth()){ continue; } FeatureSplit<FK> featureSplit = model.selectFeature(v); if(featureSplit.getFeatureId() == null){ continue; } v.setFeatureSplit(featureSplit); Pair<TreeNode<FK,FV>, TreeNode<FK, FV>> children = model.newTreeNode(v, featureSplit); TreeNode leftNode = children.getKey(); TreeNode rightNode = children.getValue(); if(leftNode != null && leftNode.getSampleSize() > model.getMinSampleSizeInNode()){ v.setLeft(leftNode); model.addTreeNode(leftNode); Q.add(leftNode); } if(rightNode != null && rightNode.getSampleSize() > model.getMinSampleSizeInNode()){ v.setRight(rightNode); model.addTreeNode(rightNode); Q.add(rightNode); } } }
And this is a parallel version:
public class NodeSplitThread implements Runnable{ private TreeNode<FK, FV> node = null; private Queue<TreeNode<FK, FV>> Q = null; public NodeSplitThread(TreeNode<FK, FV> node, Queue<TreeNode<FK, FV>> Q){ this.node = node; this.Q = Q; } @Override public void run() { if(node.getDepth() >= model.getMaxDepth()){ return; } FeatureSplit<FK> featureSplit = model.selectFeature(node); if(featureSplit.getFeatureId() == null){ return; } node.setFeatureSplit(featureSplit); Pair<TreeNode<FK,FV>, TreeNode<FK, FV>> children = model.newTreeNode(node, featureSplit); TreeNode<FK, FV> leftNode = children.getKey(); TreeNode<FK, FV> rightNode = children.getValue(); if(leftNode != null && leftNode.getSampleSize() > model.getMinSampleSizeInNode()){ node.setLeft(leftNode); model.addTreeNode(leftNode); Q.add(leftNode); } if(rightNode != null && rightNode.getSampleSize() > model.getMinSampleSizeInNode()){ node.setRight(rightNode); model.addTreeNode(rightNode); Q.add(rightNode); } } } public List<TreeNode<FK, FV>> pollTopN(Queue<TreeNode<FK, FV>> Q, int n){ List<TreeNode<FK, FV>> ret = new ArrayList<TreeNode<FK, FV>>(); for(int i = 0; i < n; ++i){ if(Q.isEmpty()) break; TreeNode<FK, FV> node = Q.poll(); ret.add(node); } return ret; } @Override public void learnFromDataSet(Iterable<Sample<FK, FV, Boolean>> dataset){ for(Sample sample : dataset){ model.addSample((MapBasedBinarySample<FK, FV>)sample); } Queue<TreeNode<FK, FV>> Q = new ConcurrentLinkedQueue<TreeNode<FK, FV>>(); TreeNode<FK, FV> root = model.selectRootTreeNode(); model.addTreeNode(root); Q.add(root); ExecutorService threadPool = Executors.newFixedThreadPool(10); while (!Q.isEmpty()){ List<TreeNode<FK, FV>> nodes = pollTopN(Q, 10); List<Future> tasks = new ArrayList<Future>(nodes.size()); for(TreeNode<FK, FV> node : nodes){ Future task = threadPool.submit(new NodeSplitThread(node, Q)); tasks.add(task); } for(Future task : tasks){ try { task.get(); } catch (InterruptedException e) { continue; } catch (ExecutionException e) { continue; } } } threadPool.shutdown(); try { threadPool.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { threadPool.shutdownNow(); Thread.interrupted(); } threadPool.shutdownNow(); }