-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathTwoPhase_Matrix_Multiplication.py
113 lines (93 loc) · 3.72 KB
/
TwoPhase_Matrix_Multiplication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python
# encoding: utf-8
'''
Matrix Multiplication: TwoPhase approach
@author: Cheng-Lin Li a.k.a. Clark Li
@copyright: 2017 Cheng-Lin Li@University of Southern California. All rights reserved.
@license: Licensed under the GNU v3.0. https://www.gnu.org/licenses/gpl.html
@contact: [email protected]
@version: 1.0
@create: February, 10, 2017
@updated: February, 10, 2017
'''
from __future__ import print_function
# Import the necessary Spark library classes, as well as sys
from pyspark import SparkConf, SparkContext, StorageLevel
import sys
from operator import add
APP_NAME = "TwoPhaseMatrixMultiplication"
DEBUG = True
# Get input and output parameters
if len(sys.argv) != 4:
print('Usage: ' + sys.argv[0] + ' <mat-A/values.txt> <mat-B/values.txt> <output.txt>')
sys.exit(1)
# Assign the input and output variables
matrix_a = sys.argv[1]
matrix_b = sys.argv[2]
output = sys.argv[3]
# Generate multiply elements in two matrixes
def mul(elements): #elements = (key1, [('A',i, value)...('B',j, value)...)])
_ma = list()
_mb = list()
_result = list()
for _lst in elements[1]: # element[1]=[('A',i, value)...('B',j, value)...)]
if str(_lst[0]) == "A":
_ma.append([str(_lst[1]), str(_lst[2])])
else:
_mb.append([str(_lst[1]), str(_lst[2])])
for _elA in _ma: #[[idx1, v1],[idx2, v2]]
for _elB in _mb:
_result.append ( ( (str(_elA[0]),str(_elB[0])), int(_elA[1])*int(_elB[1]) ) )
if DEBUG: print('_result=%s'%(str(_result)))
return _result #((i,j), multiplication)
# Create a configuration for this job
conf = SparkConf().setAppName(APP_NAME)
# Create a context for the job.
sc = SparkContext(conf=conf)
#creating RDD from external file for Matrix A and B.
rddALines = sc.textFile(matrix_a) # ["0,0,A[0,0]", ..., "i, j, A[i,k]"]
rddBLines = sc.textFile(matrix_b) # ["0,0,B[0,0]", ..., "k, j, B[k,j]"]
# Phase 1 Map Task
#Create an RDD with: The columns of A, the rows of B
# [i, k, A[i,k]] => (k, ('A', i, A[i, k]))
# [k, j, B[k,j]] => (k, ('B', j, B[k, j]))
rddPhaseOneMapperA = rddALines.map(lambda x:x.split(',')).map(lambda data: (data[1],['A', data[0],data[2]]))
rddPhaseOneMapperB = rddBLines.map(lambda x:x.split(',')).map(lambda data: (data[0],['B', data[1],data[2]]))
rddPhaseOneMapperResult = rddPhaseOneMapperA.union(rddPhaseOneMapperB).groupByKey().map(lambda x:(x[0], list(x[1])))
if DEBUG: print("==========>Phase 1 Map finished")
# Phase 1 Reduce Task
# (key1, [('A',i, value)...('B',j, value)...], ...key n, [....])
rddPhaseOneReducer = rddPhaseOneMapperResult.flatMap(lambda e: mul(e)).persist(StorageLevel.MEMORY_ONLY_SER)
if DEBUG: print("==========>Phase 1 Reduce collect() start")
#PhaseOneReducerResult = rddPhaseOneReducer.collect()
if DEBUG: print("==========>Phase 1 Reduce finished")
# Phase 2 Map Task
#rddP2Input = sc.parallelize(PhaseOneReducerResult)
#rddPhaseTwoMapper = rddP2Input.map(lambda x: x)
rddPhaseTwoMapper = rddPhaseOneReducer.map(lambda x: x)
if DEBUG: print("==========>Phase 2 Map finished")
# Phase 2 Reduce Task
rddPhaseTwoReducer= rddPhaseTwoMapper.reduceByKey(lambda x, y: x+y)
rddPhaseTwoReducerResult = rddPhaseTwoReducer.collect()
if DEBUG: print("==========>Phase 2 Reduce finished")
#print the results
try:
if DEBUG != True :
orig_stdout = sys.stdout
f = file(output, 'w')
sys.stdout = f
else:
pass
for _x in rddPhaseTwoReducerResult:
print("%s,%s\t%d"%(_x[0][0], _x[0][1], _x[1]))
sys.stdout.flush()
if DEBUG != True :
sys.stdout = orig_stdout
f.close()
else:
pass
except IOError as _err:
if (DEBUG == True):
print ('File error: ' + str (_err))
else :
pass