python - PySpark ReduceByKey -
i have been trying make work while, failed every time. have 2 files. 1 has list of names:
name1 name2 name3 name4
the other list of values associated names each day in year on several years:
['0.1,0.2,0.3,0.4', '0.5,0.6,0.7,0.8', '10,1000,0.2,5000' ...]
the goal have output like:
name1: [0.1,0.5,10] name2: [0.2,0.6,1000] name3:[0.3,0.7,0.2] name4:[0.4,0.8,5000]
and plot histogram each. wrote mapper creates list of tuples produces following output (this rdd object):
[[('name1', [0.1]),('name2', [0,2]),('name3', [0.3]),('name4', [0.4])], [('name1', [0.5]),('name2', [0,6]),('name3', [0.7]),('name4', [0.8])], [('name1', [10]),('name2', [1000]),('name3', [0.8]),('name4', [5000])]]
now need concatenate values each name in single list, each map key, value attempted returns wrong result.
you can loop through each , create dictionary using dict.setdefault()
. example -
>>> ll = [[('name1', [0.1]),('name2', [0,2]),('name3', [0.3]),('name4', [0.4])], ... [('name1', [0.5]),('name2', [0,6]),('name3', [0.7]),('name4', [0.8])], ... [('name1', [10]),('name2', [1000]),('name3', [0.8]),('name4', [5000])]] >>> d = {} >>> in ll: ... tup in i: ... d.setdefault(tup[0],[]).extend(tup[1]) ... >>> pprint.pprint(d) {'name1': [0.1, 0.5, 10], 'name2': [0, 2, 0, 6, 1000], 'name3': [0.3, 0.7, 0.8], 'name4': [0.4, 0.8, 5000]}
for pyspark rdd object, try simple reduce function such -
func = lambda x,y: x+y
then send in reducebykey
method -
object.reducebykey(func)
per comments, op has list of rdd objects (not single rdd objects) , in case can convert rdd objects list calling .collect()
, logic , , can decide whether want resultant python dictionary or rdd object, if want first. can call dict.items()
key-value pairs , call sc.parrallelize
. example -
d = {} in ll: c = i.collect() tup in i: d.setdefault(tup[0],[]).extend(tup[1]) rddobj = sc.parallelize(d.items())
Comments
Post a Comment