开发者

Java: Parallelizing quick sort via multi-threading

开发者 https://www.devze.com 2023-03-27 21:28 出处:网络
I am experimenting with parallelizing algorithms in Java. I began with merge sort, and posted my attempt in this question. My revised attempt is in the code below, where I now try to parallelize quick

I am experimenting with parallelizing algorithms in Java. I began with merge sort, and posted my attempt in this question. My revised attempt is in the code below, where I now try to parallelize quick sort.

Are there any rookie mistakes in my multi-threaded implementation or approach to this problem? If not, shouldn't I expect more than a 32% speed increase between a sequential and a parallelized algorithm on a duel-core (see timings at bottom)?

Here is the multithreading algorithm:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Here is how I start it off:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

I tested this against Arrays.sort and a similar sequential quick sort algorithm. Here are the timing results on an intel duel-core dell laptop, in seconds:

Elements: 500,000, sequential: 0.068592, threaded: 0.046871, Arrays.sort: 0.079677

Elements: 1,000,000, sequential: 0.14416, threaded: 0.095492, Arrays.sort: 0.167155

Elements: 2,000,000, sequential: 0.301666, threaded: 0.205719, Arrays.sort: 0.350982

Elements: 4,000,000, sequential: 0.623291, threaded: 0.424119, Arrays.sort: 0.712698

Elements: 8,000,000, sequential: 1.279374, threaded: 0.859363, Arrays.so开发者_如何学运维rt: 1.487671

Each number above is the average time of 100 tests, throwing out the 3 lowest and 3 highest cases. I used Random.nextInt(Integer.MAX_VALUE) to generate an array for each test, which was initialized once every 10 tests with the same seed. Each test consisted of timing the given algorithm with System.nanoTime. I rounded to six decimal places after averaging. And obviously, I did check to see if each sort worked.

As you can see, there is about a 32% increase in speed between the sequential and threaded cases in every set of tests. As I asked above, shouldn't I expect more than that?


Making numThreads static can cause problems, it is highly likely that you will end up with more than MAX_THREADS running at some point.

Probably the reason why you don't get a full double up in performance is that your quick sort can not be fully parallelised. Note that the first call to quicksort will do a pass through the whole array in the initial thread before it starts to really run in parallel. There is also an overhead in parallelising an algorithm in the form of context switching and mode transitions when farming off to separate threads.

Have a look at the Fork/Join framework, this problem would probably fit quite neatly there.

A couple of points on the implementation. Implement Runnable rather than extending Thread. Extending a Thread should be used only when you create some new version of Thread class. When you just want to do some job to be run in parallel you are better off with Runnable. While iplementing a Runnable you can also still extend another class which gives you more flexibility in OO design. Use a thread pool that is restricted to the number of threads you have available in the system. Also don't use numThreads to make the decision on whether to fork off a new thread or not. You can calculate this up front. Use a minimum partition size which is the size of the total array divided by the number of processors available. Something like:

public class ThreadedQuick implements Runnable {

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

    final int[] my_array;
    final int start, end;

    private final int minParitionSize;

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }

    public void run() {
        quicksort(my_array, start, end);
    }

    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;

        if (len <= 1)
            return;

        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];

        swap(array, pivot_index, end);

        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }

        swap(array, storeIndex, end);

        if (len > minParitionSize) {

            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);

            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}

You can kick it off by doing:

ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();

This will start the sort in the same thread, which avoids an unnecessary thread hop at start up.

Caveat: Not sure the above implementation will actually be faster as I haven't benchmarked it.


This uses a combination of quick sort and merge sort.

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ParallelSortMain {
    public static void main(String... args) throws InterruptedException {
        Random rand = new Random();
        final int[] values = new int[100*1024*1024];
        for (int i = 0; i < values.length; i++)
            values[i] = rand.nextInt();

        int threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(threads);
        int blockSize = (values.length + threads - 1) / threads;
        for (int i = 0; i < values.length; i += blockSize) {
            final int min = i;
            final int max = Math.min(min + blockSize, values.length);
            es.submit(new Runnable() {
                @Override
                public void run() {
                    Arrays.sort(values, min, max);
                }
            });
        }
        es.shutdown();
        es.awaitTermination(10, TimeUnit.MINUTES);
        for (int blockSize2 = blockSize; blockSize2 < values.length / 2; blockSize2 *= 2) {
            for (int i = 0; i < values.length; i += blockSize2) {
                final int min = i;
                final int mid = Math.min(min + blockSize2, values.length);
                final int max = Math.min(min + blockSize2 * 2, values.length);
                mergeSort(values, min, mid, max);
            }
        }
    }

    private static boolean mergeSort(int[] values, int left, int mid, int end) {
        int[] results = new int[end - left];
        int l = left, r = mid, m = 0;
        for (; l < left && r < mid; m++) {
            int lv = values[l];
            int rv = values[r];
            if (lv < rv) {
                results[m] = lv;
                l++;
            } else {
                results[m] = rv;
                r++;
            }
        }
        while (l < mid)
            results[m++] = values[l++];
        while (r < end)
            results[m++] = values[r++];
        System.arraycopy(results, 0, values, left, results.length);
        return false;
    }
}


Couple of comments if I understand your code right:

  1. I don't see a lock around the numthreads object even though it could be accessed via multiple threads. Perhaps you should make it an AtomicInteger.

  2. Use a thread pool and arrange the tasks, i.e. a single call to quicksort, to take advantange of a thread pool. Use Futures.

Your current method of dividing things the way you're doing could leave a smaller division with a thread and a larger division without a thread. That is to say, it doesn't prioritize larger segments with their own threads.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号