
1. CountDownLatch

  1. package com.itlong.whatsmars.base.sync;
  2. import java.util.concurrent.CountDownLatch;
  3. /**
  4. * Created by shenhongxi on 2016/8/12.
  5. */
  6. public class CountDownLatchTest {
  7. public static void main(String[] args) {
  8. CountDownLatch latch = new CountDownLatch(3);
  9. long start = System.currentTimeMillis();
  10. for (int i = 0; i < 3; i++) {
  11. new Thread(new SubRunnable(i, latch)).start();
  12. }
  13. try {
  14. latch.await();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. System.out.println(System.currentTimeMillis() - start);
  19. System.out.println("Main finished");
  20. }
  21. static class SubRunnable implements Runnable {
  22. private int id = -1;
  23. private CountDownLatch latch;
  24. SubRunnable(int id, CountDownLatch latch) {
  25. this.id = id;
  26. this.latch = latch;
  27. }
  28. @Override
  29. public void run() {
  30. try {
  31. Thread.sleep(3000);
  32. System.out.println(String
  33. .format("Sub Thread %d finished", id));
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. } finally {
  37. latch.countDown();
  38. }
  39. }
  40. }
  41. }


2. ExecutorService

  1. package com.itlong.whatsmars.base.sync;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. /**
  8. * Created by shenhongxi on 2016/8/12.
  9. */
  10. public class CallableTest {
  11. public static void main(String[] args) throws Exception {
  12. ExecutorService pool = Executors.newFixedThreadPool(3);
  13. List<Callable<Void>> subs = new ArrayList<Callable<Void>>();
  14. for (int i = 0; i < 3; i++) {
  15. subs.add(new SubCallable(i));
  16. }
  17. long start = System.currentTimeMillis();
  18. try {
  19. pool.invokeAll(subs);
  20. } finally {
  21. pool.shutdown();
  22. }
  23. System.out.println(System.currentTimeMillis() - start);
  24. System.out.println("Main finished");
  25. }
  26. static class SubCallable implements Callable<Void> {
  27. private int id = -1;
  28. public SubCallable(int id) {
  29. this.id = id;
  30. }
  31. @Override
  32. public Void call() throws Exception {
  33. try {
  34. Thread.sleep(3000);
  35. System.out.println(String
  36. .format("Child Thread %d finished", id));
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. return null;
  41. }
  42. }
  43. }


  1. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2. throws InterruptedException {
  3. if (tasks == null)
  4. throw new NullPointerException();
  5. List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  6. boolean done = false;
  7. try {
  8. for (Callable<T> t : tasks) {
  9. RunnableFuture<T> f = newTaskFor(t);
  10. futures.add(f);
  11. execute(f);
  12. }
  13. for (Future<T> f : futures) {
  14. if (!f.isDone()) {
  15. try {
  16. f.get();
  17. } catch (CancellationException ignore) {
  18. } catch (ExecutionException ignore) {
  19. }
  20. }
  21. }
  22. done = true;
  23. return futures;
  24. } finally {
  25. if (!done)
  26. for (Future<T> f : futures)
  27. f.cancel(true);
  28. }
  29. }


  1. package com.itlong.whatsmars.base.sync;
  2. /**
  3. * Created by shenhongxi on 2016/8/12.
  4. * 子线程与主线程是顺序执行的,各子线程之间还是异步的
  5. */
  6. public class JoinTest {
  7. public static void main(String[] args) throws Exception {
  8. Thread t1 = new Thread(new SubRunnable(0));
  9. Thread t2 = new Thread(new SubRunnable(1));
  10. Thread t3 = new Thread(new SubRunnable(2));
  11. long start = System.currentTimeMillis();
  12. t1.start();
  13. t2.start();
  14. t3.start();
  15. t1.join();
  16. t2.join();
  17. t3.join();
  18. System.out.println(System.currentTimeMillis() - start);
  19. System.out.println("Main finished");
  20. }
  21. static class SubRunnable implements Runnable {
  22. private int id = -1;
  23. SubRunnable(int id) {
  24. this.id = id;
  25. }
  26. @Override
  27. public void run() {
  28. try {
  29. System.out.println("hi, I'm id-" + id);
  30. Thread.sleep(9000);
  31. System.out.println(String
  32. .format("Sub Thread %d finished", id));
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }


  1. public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor {
  2. private final Object poolSizeMonitor = new Object();
  3. private int corePoolSize = 1;
  4. private int maxPoolSize = Integer.MAX_VALUE;
  5. private int keepAliveSeconds = 60;
  6. private boolean allowCoreThreadTimeOut = false;
  7. private int queueCapacity = Integer.MAX_VALUE;
  8. private ThreadPoolExecutor threadPoolExecutor;
  9. /**
  10. * Set the ThreadPoolExecutor's core pool size.
  11. * Default is 1.
  12. * <p><b>This setting can be modified at runtime, for example through JMX.</b>
  13. */
  14. public void setCorePoolSize(int corePoolSize) {
  15. synchronized (this.poolSizeMonitor) {
  16. this.corePoolSize = corePoolSize;
  17. if (this.threadPoolExecutor != null) {
  18. this.threadPoolExecutor.setCorePoolSize(corePoolSize);
  19. }
  20. }
  21. }
  22. /**
  23. * Return the ThreadPoolExecutor's core pool size.
  24. */
  25. public int getCorePoolSize() {
  26. synchronized (this.poolSizeMonitor) {
  27. return this.corePoolSize;
  28. }
  29. }



  1. public interface ExecutorService extends Executor {
  2. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  3. throws InterruptedException;
  4. }
  5. public interface Executor {
  6. void execute(Runnable command);
  7. }
  8. public abstract class AbstractExecutorService implements ExecutorService{
  9. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
  10. // ...
  11. }
  12. }
  13. public class ThreadPoolExecutor extends AbstractExecutorService {
  14. public ThreadPoolExecutor(int corePoolSize,
  15. int maximumPoolSize,
  16. long keepAliveTime,
  17. TimeUnit unit,
  18. BlockingQueue<Runnable> workQueue) {
  19. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  20. Executors.defaultThreadFactory(), defaultHandler);
  21. }
  22. }



