聚类分析代码在本地调试正常,运行在spark cluster模式,运行结果出错?

0
本人,计算机小菜,刚刚接触Spark. 
在本地通过python实现了一个聚类算法,然后想把这个应用转化到spark上运行。在本地测试过代码正常,运行结果正常。但是转到spark上运行结果出现异常。
本地运行代码,和结果:
if __name__ == "__main__": wordseg = wordSeg.wordSeg() clustermanage = clusterAlgorithm.clusterAlgorithm() file_path = "/home/warrior/Desktop/skynews/" status, ls_results = commands.getstatusoutput("ls "+file_path) p = re.compile('''\w+''') file_name_list = p.findall(ls_results) print file_name_list #获取文件内容,按文件存入一个list file_content_list = [] for item in file_name_list: fd = open(file_path+item,'r') file_content_list.append(fd.read()) fd.close() #将对应文章进行词频统计,并以word--count形式存储,然后整篇文章仍旧以list存储 file_word_dict_list = [] for item in file_content_list: file_word_dict_list.append(wordseg.GetWordCountTable(item)) #建立一个包含所有文章词汇的字典 all_word_dict = wordseg.updateAllWordList(file_word_dict_list) #建立对应文章的词频特征 file_feature_dict_list = [] for item in file_word_dict_list: file_feature_dict_list.append(wordseg.constructFeatureDict(item, all_word_dict)) #初选k个类作为初始类中心 k_class_centre = file_feature_dict_list[0:4] #进行一次迭代,并记录下每篇文章属于哪个类, #k_class_record这个list的index索引与file_feature_dict_list的文章索引相对应,对应索引上的值表示该文章属于哪个类 k_class_record = [] for item in file_feature_dict_list: k_class_record.append(clustermanage.once_K_MeansCluster(clustermanage.calcu_cosine_corr,item, k_class_centre, 4)) print k_class_record print "hello!!"
本地运行结果:
[0, 1, 2, 3, 0, 0, 3, 0, 3, 2, 3, 2, ...... 2, 3, 0]
spark 代码:
from pyspark import SparkContext import commands from sparkClusterAlgorithm import * from wordSeg import * sc = SparkContext(appName="newsCluster") ...... #从HDFS获取文件 files = sc.wholeTextFiles("hdfs://warrior:9000/testData/skynews") file_list = files.map(lambda item: item[1]) file_wc_dict_list = file_list.map(lambda file_content:wordseg.GetWordCountTable(file_content)) file_wc_dict_list_result = file_wc_dict_list.collect() all_word_result = wordseg.updateAllWordList(file_wc_dict_list_result) file_feature_dict_list = file_wc_dict_list.map(lambda file_wc_dict: wordseg.constructFeatureDict(file_wc_dict, all_word_result)) file_feature_dict_result = file_feature_dict_list.collect() k_class_sample_record = [] k_class_sample_record = file_feature_dict_list.map(lambda file_feature_dict:clustermanage.K_MeansCluster(clustermanage.calcu_cosine_corr, file_feature_dict, file_feature_dict_result, 4)) k_class_sample_record_result = k_class_sample_record.collect() print k_class_sample_record_result sc.stop()
spark cluster输出结果:
[0, 1, 0, 3, 0, 0, 0, 0, 0, 3, 0, 1, ...... 0, 3, 0]
通过对比,集群的运算出现异常,因为在前四片文章,
本地是
"0, 1, 2, 3,"
集群是
"0, 1, 0, 3,"
正确的结果正如本地代码的结果,因为初选的就是前四篇文章作为类的初始迭代中心。
很是疑惑,初入门spark,望大家给解答下!!
不胜感激!!
已邀请:

要回复问题请先登录注册