2013
10.03
10.03
2PMMS(2-phase multiway merge sort),顾名思义,由2个阶段组成。第一个阶段将源文件分批次读到内存中,采用某种内部排序算法进行排序,然后将每次排序后的结果写到硬盘上,1个批次对应1个文件;第二个阶段将n个文件批量读到缓存里,从N个缓存序列里找到最小数,放到输出缓存里。每次放完后,要检查每个输入缓存是否读完,如果读完则从对应的文件里继续读数据到缓存,直到文件全部读完为止;并检查输出缓存是否已满,如满则输出到文件并清空输出缓存。
4个类:
(1)
public class Main { /* * 2 arguments * first argument: inputfile ex: d:\\data\\input.dat * second argument: output folder ex: d:\\data */ public static void main(String[] args) throws Exception { if(args.length!=2){ System.out.println("arguments are not valid"); return; } long startTime = System.currentTimeMillis(); //read data, sort, output sorted data to files PhaseOne phaseOne = new PhaseOne(); List<String> fileNameList = phaseOne.generateSortedFile(args[0], args[1]); //merge PhaseTwo phaseTwo = new PhaseTwo(); phaseTwo.merge(fileNameList, args[1]); //delete temp files for(String fileName: fileNameList){ new File(fileName).delete(); } long finishTime = System.currentTimeMillis(); double totalTime = (double)(finishTime - startTime) / 1000; System.out.println("total time: " + totalTime + " seconds"); } }
(2)
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class PhaseOne { private final int amountOnce = 550000; public List<String> generateSortedFile(String inputFile, String outputFolder) { List<String> fileNameList = new ArrayList<String>(); BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(inputFile)); String tempString = null; int line = 0; int[] array = new int[amountOnce]; while ((tempString = reader.readLine()) != null) { //System.out.println("line " + line + ": " + tempString); array[line % amountOnce] = Integer.parseInt(tempString); if(line % amountOnce == amountOnce -1){ String fileNO = (line/amountOnce)<10 ? "0"+ line/amountOnce : String.valueOf(line/amountOnce); String fileName = outputFolder+ "/tmp"+ fileNO+".dat"; writeData(array, line%amountOnce+1, fileName,fileNameList); } line++; } String fileNO = (line/amountOnce)<10 ? "0"+ line/amountOnce : String.valueOf(line/amountOnce); String fileName = outputFolder+ "/tmp"+ fileNO+".dat"; writeData(array, line%amountOnce, fileName, fileNameList); reader.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } return fileNameList; } } private void writeData(int[] array, int length, String fileName, List<String> fileNames) throws IOException{ Quicksort.quickSort(array, length); BufferedWriter output = new BufferedWriter(new FileWriter(fileName)); for(int i=0;i<length;i++){ output.write(String.valueOf(array[i])); output.write("\r\n"); } output.close(); fileNames.add(fileName); array = new int[amountOnce]; } }
(3)
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; public class PhaseTwo { public static int inputBufferSize = 30000; public static int outputBufferSize = 100000; public static boolean finishFlag = false; public void merge(List<String> fileNameList, String outputFolder) { int[] outputBuffer = new int[outputBufferSize]; int[][] inputBuffers = new int[fileNameList.size()][inputBufferSize]; List<BufferedReader> readers = new ArrayList<BufferedReader>(); try { for(int i=0;i<fileNameList.size();i++){ BufferedReader reader = new BufferedReader(new FileReader(fileNameList.get(i))); readers.add(reader); String tempString = null; int line = 0; while ((tempString = readers.get(i).readLine()) != null) { inputBuffers[i][line] = Integer.parseInt(tempString); line++; if(line >= inputBufferSize){ break; } } } int[] pointers = new int[fileNameList.size()]; int outputBufferIndex = 0; while(true){ int min = findMin(inputBuffers,pointers, readers); outputBuffer[outputBufferIndex % outputBufferSize] = min; if(outputBufferIndex % outputBufferSize == outputBufferSize-1){ outputData(outputBuffer, outputBufferSize, outputFolder+"/result.dat"); } outputBufferIndex++; if(finishFlag){ break; } } outputData(outputBuffer, outputBufferIndex%outputBufferSize, outputFolder+"/result.dat"); for(int i=0;i<fileNameList.size();i++){ readers.get(i).close(); } System.out.println("------------------"); } catch (IOException e) { e.printStackTrace(); } finally { for(int i=0;i<fileNameList.size();i++){ if (readers.get(i) != null) { try { readers.get(i).close(); } catch (IOException e1) { } } } } } private int findMin(int[][] arrays, int[] pointers, List<BufferedReader> readers) throws IOException{ int min = 0; int index = 0; for(int i=0;i<arrays.length;i++){ if(arrays[i][pointers[i]]>0){ min = arrays[i][pointers[i]]; index = i; break; } } for(int i=index+1;i<arrays.length;i++){ if(arrays[i][pointers[i]]<=0){ continue; } if(arrays[i][pointers[i]] < min){ min = arrays[i][pointers[i]]; index = i; } } pointers[index] = pointers[index]+1; for(int i=0;i<pointers.length;i++){ if(pointers[i]>=inputBufferSize){ arrays[i] = new int[inputBufferSize]; String tempString = null; int line = 0; while ((tempString = readers.get(i).readLine()) != null) { arrays[i][line] = Integer.parseInt(tempString); line++; if(line >= inputBufferSize){ break; } } pointers[i]= 0; } } //update finishFlag if(!finishFlag){ boolean flag = true; for(int i=0;i<arrays.length;i++){ if(arrays[i][pointers[i]]>0){ flag = false; break; } } finishFlag = flag; } return min; } private void outputData(int[] outputBuffer, int length, String outputFile) throws IOException{ BufferedWriter output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputFile, true))); for(int i=0;i<length;i++){ StringBuffer sb = new StringBuffer(); sb.append(outputBuffer[i]); sb.append("\r\n"); output.write(sb.toString()); } output.close(); outputBuffer = new int[outputBufferSize]; } }
(4)
public class Quicksort { public static void quickSort(int[] data, int length) { recQuickSort(data, 0, length-1); } private static void recQuickSort(int[] data,int left, int right) { if(right-left <= 0){ return; } else { long pivot = data[right]; int partition = partitionIt(data, left, right, pivot); recQuickSort(data, left, partition-1); recQuickSort(data, partition+1, right); } } private static int partitionIt(int[] data, int left, int right, long pivot) { int leftPtr = left-1; int rightPtr = right; while(true) { while(data[++leftPtr] < pivot ) ; while(rightPtr > 0 && data[--rightPtr] > pivot) ; if(leftPtr >= rightPtr){ break; } else{ swap(data, leftPtr, rightPtr); } } swap(data, leftPtr, right); return leftPtr; } private static void swap(int[] data, int dex1, int dex2) { int temp = data[dex1]; data[dex1] = data[dex2]; data[dex2] = temp; } }