dataset = """ role_1,u1,1,1 role_1,u1,2,2 role_1,u1,3,3 role_1,u1,4,4 role_2,u2,5,5 role_2,u2,6,6 """ data = para.sc.parallelize(dataset.strip().split(" ")).filter(lambda line: line) data = data.map(lambda line: line.strip().split(',')) print data.take(6) unweighted = data .map(lambda (role_id, role_name, total_num, logtime): ((role_id, role_name), (int(total_num), str(logtime)))) .reduceByKey(lambda x, y: (x[0] + y[0], min(x[1], y[1]))) .map(lambda x: (x[0][0], x[0][1], x[1][0], x[1][1])) print unweighted.take(2)