As we all know , up to Spark 1.6.2, JavaSparkContext only provides two kinds of accumulators: Integer and Double.
However, unfortunately I've met with problems of Integer overflow and the program returned me a negative number.
So I have to use original sparkcontext to implement the Long accumulator.
public static class LongAccumulatorParam implements AccumulatorParam<Long>,Serializable { @Override public Long addAccumulator(final Long r, final Long t) { return r + t; } @Override public Long addInPlace(final Long r1, final Long r2) { return r1 + r2; } @Override public Long zero(final Long initialValue) { return 0L; } }
final Accumulator<Long> acc = jsc.sc().accumulator(new Long(0), new LongAccumulatorParam());
Actually it is pretty simple. I haven't looked into Spark 2 yet, hope the developers have fixed this issue.