2PMMS(2-phase multiway merge sort),顾名思义,由2个阶段组成。第一个阶段将源文件分批次读到内存中,采用某种内部排序算法进行排序,然后将每次排序后的结果写到硬盘上,1个批次对应1个文件;第二个阶段将n个文件批量读到缓存里,从N个缓存序列里找到最小数,放到输出缓存里。每次放完后,要检查每个输入缓存是否读完,如果读完则从对应的文件里继续读数据到缓存,直到文件全部读完为止;并检查输出缓存是否已满,如满则输出到文件并清空输出缓存。
4个类:
(1)
public class Main {
/*
* 2 arguments
* first argument: inputfile ex: d:\\data\\input.dat
* second argument: output folder ex: d:\\data
*/
public static void main(String[] args) throws Exception {
if(args.length!=2){
System.out.println("arguments are not valid");
return;
}
long startTime = System.currentTimeMillis();
//read data, sort, output sorted data to files
PhaseOne phaseOne = new PhaseOne();
List<String> fileNameList = phaseOne.generateSortedFile(args[0], args[1]);
//merge
PhaseTwo phaseTwo = new PhaseTwo();
phaseTwo.merge(fileNameList, args[1]);
//delete temp files
for(String fileName: fileNameList){
new File(fileName).delete();
}
long finishTime = System.currentTimeMillis();
double totalTime = (double)(finishTime - startTime) / 1000;
System.out.println("total time: " + totalTime + " seconds");
}
}
(2)
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class PhaseOne {
private final int amountOnce = 550000;
public List<String> generateSortedFile(String inputFile, String outputFolder) {
List<String> fileNameList = new ArrayList<String>();
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(inputFile));
String tempString = null;
int line = 0;
int[] array = new int[amountOnce];
while ((tempString = reader.readLine()) != null) {
//System.out.println("line " + line + ": " + tempString);
array[line % amountOnce] = Integer.parseInt(tempString);
if(line % amountOnce == amountOnce -1){
String fileNO = (line/amountOnce)<10 ? "0"+ line/amountOnce : String.valueOf(line/amountOnce);
String fileName = outputFolder+ "/tmp"+ fileNO+".dat";
writeData(array, line%amountOnce+1, fileName,fileNameList);
}
line++;
}
String fileNO = (line/amountOnce)<10 ? "0"+ line/amountOnce : String.valueOf(line/amountOnce);
String fileName = outputFolder+ "/tmp"+ fileNO+".dat";
writeData(array, line%amountOnce, fileName, fileNameList);
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
}
catch (IOException e1) {
}
}
return fileNameList;
}
}
private void writeData(int[] array, int length, String fileName, List<String> fileNames) throws IOException{
Quicksort.quickSort(array, length);
BufferedWriter output = new BufferedWriter(new FileWriter(fileName));
for(int i=0;i<length;i++){
output.write(String.valueOf(array[i]));
output.write("\r\n");
}
output.close();
fileNames.add(fileName);
array = new int[amountOnce];
}
}
(3)
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
public class PhaseTwo {
public static int inputBufferSize = 30000;
public static int outputBufferSize = 100000;
public static boolean finishFlag = false;
public void merge(List<String> fileNameList, String outputFolder) {
int[] outputBuffer = new int[outputBufferSize];
int[][] inputBuffers = new int[fileNameList.size()][inputBufferSize];
List<BufferedReader> readers = new ArrayList<BufferedReader>();
try {
for(int i=0;i<fileNameList.size();i++){
BufferedReader reader = new BufferedReader(new FileReader(fileNameList.get(i)));
readers.add(reader);
String tempString = null;
int line = 0;
while ((tempString = readers.get(i).readLine()) != null) {
inputBuffers[i][line] = Integer.parseInt(tempString);
line++;
if(line >= inputBufferSize){
break;
}
}
}
int[] pointers = new int[fileNameList.size()];
int outputBufferIndex = 0;
while(true){
int min = findMin(inputBuffers,pointers, readers);
outputBuffer[outputBufferIndex % outputBufferSize] = min;
if(outputBufferIndex % outputBufferSize == outputBufferSize-1){
outputData(outputBuffer, outputBufferSize, outputFolder+"/result.dat");
}
outputBufferIndex++;
if(finishFlag){
break;
}
}
outputData(outputBuffer, outputBufferIndex%outputBufferSize, outputFolder+"/result.dat");
for(int i=0;i<fileNameList.size();i++){
readers.get(i).close();
}
System.out.println("------------------");
} catch (IOException e) {
e.printStackTrace();
} finally {
for(int i=0;i<fileNameList.size();i++){
if (readers.get(i) != null) {
try {
readers.get(i).close();
}
catch (IOException e1) {
}
}
}
}
}
private int findMin(int[][] arrays, int[] pointers, List<BufferedReader> readers) throws IOException{
int min = 0;
int index = 0;
for(int i=0;i<arrays.length;i++){
if(arrays[i][pointers[i]]>0){
min = arrays[i][pointers[i]];
index = i;
break;
}
}
for(int i=index+1;i<arrays.length;i++){
if(arrays[i][pointers[i]]<=0){
continue;
}
if(arrays[i][pointers[i]] < min){
min = arrays[i][pointers[i]];
index = i;
}
}
pointers[index] = pointers[index]+1;
for(int i=0;i<pointers.length;i++){
if(pointers[i]>=inputBufferSize){
arrays[i] = new int[inputBufferSize];
String tempString = null;
int line = 0;
while ((tempString = readers.get(i).readLine()) != null) {
arrays[i][line] = Integer.parseInt(tempString);
line++;
if(line >= inputBufferSize){
break;
}
}
pointers[i]= 0;
}
}
//update finishFlag
if(!finishFlag){
boolean flag = true;
for(int i=0;i<arrays.length;i++){
if(arrays[i][pointers[i]]>0){
flag = false;
break;
}
}
finishFlag = flag;
}
return min;
}
private void outputData(int[] outputBuffer, int length, String outputFile) throws IOException{
BufferedWriter output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputFile, true)));
for(int i=0;i<length;i++){
StringBuffer sb = new StringBuffer();
sb.append(outputBuffer[i]);
sb.append("\r\n");
output.write(sb.toString());
}
output.close();
outputBuffer = new int[outputBufferSize];
}
}
(4)
public class Quicksort {
public static void quickSort(int[] data, int length) {
recQuickSort(data, 0, length-1);
}
private static void recQuickSort(int[] data,int left, int right) {
if(right-left <= 0){
return;
}
else {
long pivot = data[right];
int partition = partitionIt(data, left, right, pivot);
recQuickSort(data, left, partition-1);
recQuickSort(data, partition+1, right);
}
}
private static int partitionIt(int[] data, int left, int right, long pivot) {
int leftPtr = left-1;
int rightPtr = right;
while(true) {
while(data[++leftPtr] < pivot )
;
while(rightPtr > 0 && data[--rightPtr] > pivot)
;
if(leftPtr >= rightPtr){
break;
}
else{
swap(data, leftPtr, rightPtr);
}
}
swap(data, leftPtr, right);
return leftPtr;
}
private static void swap(int[] data, int dex1, int dex2) {
int temp = data[dex1];
data[dex1] = data[dex2];
data[dex2] = temp;
}
}