java8并发学习

这里继续学习记录java8的并发知识。关于什么是并发,什么是并行,什么是进程,什么是线程,有什么关系区别等等就不贴出来啦。

并发在Java5中首次被引入并在后续的版本中不断得到增强。Java从JDK1.0开始执行线程。在开始一个新的线程之前,你必须指定由这个线程执行的代码,通常称为task。

线程与执行器

Runnable

我们可以通过实现Runnable——一个定义了一个无返回值无参数的run()方法的函数接口,来实现task。

  1. /**
  2.      * Runnable
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 下午3:19:58
  6.      */
  7.     private void testRunnable() {
  8.         Runnable task = () -> {
  9.             String threadName = Thread.currentThread().getName();
  10.             System.out.println(“Hello “ + threadName);
  11.         };
  12.         task.run(); // 非线程方式调用,还在主线程里
  13.         Thread thread = new Thread(task);
  14.         thread.start();
  15.         System.out.println(“Done!”);  // runnable是在打印’done’前执行还是在之后执行,顺序是不确定的
  16.     }

我们可以将线程休眠确定的时间。通过这种方法来模拟长时间运行的任务。

  1. /**
  2.      * 设置线程休眠时间,模拟长任务
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月17日 下午3:25:01
  6.      */
  7.     private void testRunnableWithSleep() {
  8.         Runnable runnable = () -> {
  9.             try {
  10.                 String name = Thread.currentThread().getName();
  11.                 System.out.println(“Foo “ + name);
  12.                 TimeUnit.SECONDS.sleep(1); // 休眠1s
  13.                 System.out.println(“Bar “ + name);
  14.             }
  15.             catch (InterruptedException e) {
  16.                 e.printStackTrace();
  17.             }
  18.         };
  19.         Thread thread = new Thread(runnable);
  20.         thread.start();
  21.     }

Executor

并发API引入了ExecutorService作为一个在程序中直接使用Thread的高层次的替换方案。Executors支持运行异步任务,通常管理一个线程池,这样一来我们就不需要手动去创建新的线程。在不断地处理任务的过程中,线程池内部线程将会得到复用,Java进程从没有停止!Executors必须显式的停止-否则它们将持续监听新的任务。

ExecutorService提供了两个方法来达到这个目的——shutdwon()会等待正在执行的任务执行完而shutdownNow()会终止所有正在执行的任务并立即关闭executor。

  1. ExecutorService executor = Executors.newSingleThreadExecutor(); // 单线程线程池
  2.         executor.submit(() -> {
  3.             String threadName = Thread.currentThread().getName();
  4.             System.out.println(“Hello “ + threadName);
  5.             try {
  6.                 TimeUnit.SECONDS.sleep(6);
  7.             } catch (InterruptedException e) {
  8.                 System.err.println(“my is interrupted”);
  9.             } // 休眠1s
  10.         });
  11.         // => Hello pool-1-thread-1
  12.         // Executors必须显式的停止-否则它们将持续监听新的任务
  13.         try {
  14.             System.out.println(“attempt to shutdown executor”);
  15.             executor.shutdown(); // 等待正在执行的任务执行完
  16.             executor.awaitTermination(5, TimeUnit.SECONDS); // 等待指定时间优雅关闭executor。在等待最长5s的时间后,executor最终会通过中断所有的正在执行的任务关闭
  17.             System.out.println(“wait for 5s to shutdown”);
  18.         } catch (InterruptedException e) {
  19.             System.err.println(“tasks interrupted”);
  20.         } finally {
  21.             if (!executor.isTerminated()) {
  22.                 System.err.println(“cancel non-finished tasks”);
  23.             }
  24.             executor.shutdownNow(); // 终止所有正在执行的任务并立即关闭executor
  25.             System.out.println(“shutdown finished”);
  26.         }

Callable

Callables也是类似于runnables的函数接口,不同之处在于,Callable返回一个值。一样提交给 executor services。在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果 123 之前执行完成。

  1. Callable<Integer> task = () -> {
  2.             try {
  3.                 TimeUnit.SECONDS.sleep(5); // 休眠5s后返回整数
  4.                 return 123;
  5.             }
  6.             catch (InterruptedException e) {
  7.                 throw new IllegalStateException(“task interrupted”, e);
  8.             }
  9.         };
  10.         ExecutorService executor = Executors.newFixedThreadPool(1); // 固定线程池
  11.         Future<Integer> future = executor.submit(task);
  12.         System.out.println(“future done? “ + future.isDone());
  13. //      executor.shutdownNow(); // 如果关闭executor,所有的未中止的future都会抛出异常。
  14.         Integer result = future.get(); // 在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果123之前执行完成
  15.         System.out.println(“future done? “ + future.isDone());
  16.         System.out.println(“result: “ + result);
  17.         executor.shutdownNow(); // 需要显式关闭
  18.         System.out.println(“result: “ + future.get());
  19.     }

任何future.get()调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行——因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。

  1. ExecutorService executor = Executors.newFixedThreadPool(1);
  2.         Future<Integer> future = executor.submit(() -> {
  3.             try {
  4.                 TimeUnit.SECONDS.sleep(2);
  5.                 return 123;
  6.             }
  7.             catch (InterruptedException e) {
  8.                 throw new IllegalStateException(“task interrupted”, e);
  9.             }
  10.         });
  11.         // 任何future.get()调用都会阻塞,然后等待直到callable中止,传入超时时长终止
  12.         future.get(1, TimeUnit.SECONDS);  // 抛出 java.util.concurrent.TimeoutException

invokeAll

Executors支持通过invokeAll()一次批量提交多个callable。这个方法结果一个callable的集合,然后返回一个future的列表。

  1. ExecutorService executor = Executors.newWorkStealingPool(); // ForkJoinPool 一个并行因子数来创建,默认值为主机CPU的可用核心数
  2.         List<Callable<String>> callables = Arrays.asList(
  3.                 () -> “task1”,
  4.                 () -> “task2”,
  5.                 () -> “task3”);
  6.         executor.invokeAll(callables)
  7.             .stream()
  8.             .map(future -> { // 返回的所有future,并每一个future映射到它的返回值
  9.                 try {
  10.                     return future.get();
  11.                 }
  12.                 catch (Exception e) {
  13.                     throw new IllegalStateException(e);
  14.                 }
  15.             })
  16.             .forEach(System.out::println);

invokeAny

批量提交callable的另一种方式就是invokeAny(),它的工作方式与invokeAll()稍有不同。在等待future对象的过程中,这个方法将会阻塞直到第一个callable中止然后返回这一个callable的结果。

  1. private static Callable<String> callable(String result, long sleepSeconds) {
  2.         return () -> {
  3.             TimeUnit.SECONDS.sleep(sleepSeconds);
  4.             return result;
  5.         };
  6.     }
  7.     public static void main(String[] args) throws InterruptedException, ExecutionException {
  8.         ExecutorService executor = Executors.newWorkStealingPool();
  9.         List<Callable<String>> callables = Arrays.asList(
  10.         callable(“task1”2),
  11.         callable(“task2”1),
  12.         callable(“task3”3));
  13.         String result = executor.invokeAny(callables);
  14.         System.out.println(result); // task2
  15.     }

这个例子又使用了另一种方式来创建executor——调用newWorkStealingPool()。这个工厂方法是Java8引入的,返回一个ForkJoinPool类型的 executor,它的工作方法与其他常见的execuotr稍有不同。与使用一个固定大小的线程池不同,ForkJoinPools使用一个并行因子数来创建,默认值为主机CPU的可用核心数。

ScheduledExecutor

为了持续的多次执行常见的任务,我们可以利用调度线程池。ScheduledExecutorService支持任务调度,持续执行或者延迟一段时间后执行。调度一个任务将会产生一个专门的future类型——ScheduleFuture,它除了提供了Future的所有方法之外,他还提供了getDelay()方法来获得剩余的延迟。在延迟消逝后,任务将会并发执行。

为了调度任务持续的执行,executors 提供了两个方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一个方法用来以固定频率来执行一个任务,另一个方法等待时间是在一次任务的结束和下一个任务的开始之间

  1. /**
  2.      * 获取剩余延迟
  3.      * @throws InterruptedException
  4.      * @author wenqy
  5.      * @date 2020年1月17日 下午4:56:58
  6.      */
  7.     private static void scheduleDelay() throws InterruptedException {
  8.         ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  9.         Runnable task = () -> System.out.println(“Scheduling: “ + System.nanoTime());
  10.         ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS); // 3s后执行
  11.         TimeUnit.MILLISECONDS.sleep(1337);
  12.         long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
  13.         System.out.printf(“Remaining Delay: %sms\n”, remainingDelay); // 剩余的延迟
  14.         executor.shutdown();
  15.     }
  16.     /**
  17.      * 以固定频率来执行一个任务
  18.      * 
  19.      * @throws InterruptedException
  20.      * @author wenqy
  21.      * @date 2020年1月17日 下午4:57:45
  22.      */
  23.     private static void scheduleAtFixedRate() throws InterruptedException {
  24.         ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  25.         Runnable task = () -> {
  26.             System.out.println(“at fixed rate Scheduling: “ + System.nanoTime());
  27.             try {
  28.                 TimeUnit.SECONDS.sleep(2);
  29.             } catch (InterruptedException e) {
  30.                 e.printStackTrace();
  31.             }
  32.         };
  33.         int initialDelay = 0// 初始化延迟,用来指定这个任务首次被执行等待的时长
  34.         int period = 1;
  35.         executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS); // 不考虑任务的实际用时
  36.     }
  37.     /**
  38.      * 以固定延迟来执行一个任务
  39.      *  等待时间 period 是在一次任务的结束和下一个任务的开始之间
  40.      * @throws InterruptedException
  41.      * @author wenqy
  42.      * @date 2020年1月17日 下午5:03:28
  43.      */
  44.     private static void scheduleWithFixedDelay() throws InterruptedException {
  45.         ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  46.         Runnable task = () -> {
  47.             try {
  48.                 TimeUnit.SECONDS.sleep(2);
  49.                 System.out.println(“WithFixedDelay Scheduling: “ + System.nanoTime());
  50.             }
  51.             catch (InterruptedException e) {
  52.                 System.err.println(“task interrupted”);
  53.             }
  54.         };
  55.         executor.scheduleWithFixedDelay(task, 01, TimeUnit.SECONDS);
  56.     }

同步与锁

我们学到了如何通过执行器服务同时执行代码。当我们编写这种多线程代码时,我们需要特别注意共享可变变量的并发访问。

  1. int count = 0;
  2.     void increment() {
  3.         count = count + 1;
  4.     }

我们在不同的线程上共享可变变量,并且变量访问没有同步机制,这会产生竞争条件。上面例子被多个线程同时访问,就会出现未知的错误。

我们可以用synchronized关键字支持线程同步

  1. synchronized void incrementSync() {
  2.       count = count + 1;
  3. }

synchronized关键字也可用于语句块

  1. void incrementSync2() {
  2.         synchronized (this) {
  3.             count = count + 1;
  4.         }
  5.     }

java在内部使用所谓的“监视器”(monitor),也称为监视器锁(monitor lock)或内在锁( intrinsic lock)来管理同步。监视器绑定在对象上,例如,当使用同步方法时,每个方法都共享相应对象的相同监视器。

所有隐式的监视器都实现了重入(reentrant)特性。重入的意思是锁绑定在当前线程上。线程可以安全地多次获取相同的锁,而不会产生死锁(例如,同步方法调用相同对象的另一个同步方法)

并发API支持多种显式的锁,它们由Lock接口规定,用于代替synchronized的隐式锁。锁对细粒度的控制支持多种方法,因此它们比隐式的监视器具有更大的开销。

ReentrantLock

ReentrantLock类是互斥锁,与通过synchronized访问的隐式监视器具有相同行为,但是具有扩展功能。就像它的名称一样,这个锁实现了重入特性,就像隐式监视器一样。

锁可以通过lock()来获取,通过unlock()来释放。把你的代码包装在try-finally代码块中来确保异常情况下的解锁非常重要。

  1. /**
  2.  * 可重入锁
  3.  * 
  4.  * @author wenqy
  5.  * @date 2020年1月18日 下午3:41:50
  6.  */
  7. private void safeIncreByLock() {
  8.     count = 0;
  9.     ExecutorService executor = Executors.newFixedThreadPool(2);
  10.     executor.submit(() -> {
  11.         lock.lock();
  12.         try {
  13.             ConcurrentUtils.sleep(1);
  14.         } finally {
  15.             lock.unlock();
  16.         }
  17.     });
  18.     executor.submit(() -> {
  19.         System.out.println(“Locked: “ + lock.isLocked());
  20.         System.out.println(“Held by me: “ + lock.isHeldByCurrentThread());
  21.         boolean locked = lock.tryLock(); // 尝试拿锁而不阻塞当前线程
  22.         // 在访问任何共享可变变量之前,必须使用布尔值结果来检查锁是否已经被获取
  23.         System.out.println(“Lock acquired: “ + locked);
  24.     });
  25.     ConcurrentUtils.stop(executor);
  26. }

ReadWriteLock

ReadWriteLock接口规定了锁的另一种类型,包含用于读写访问的一对锁。读写锁的理念是,只要没有任何线程写入变量,并发读取可变变量通常是安全的。所以读锁可以同时被多个线程持有,只要没有线程持有写锁。这样可以提升性能和吞吐量,因为读取比写入更加频繁。

  1. /**
  2.  * 读写锁
  3.  * 
  4.  * @author wenqy
  5.  * @date 2020年1月18日 下午3:41:21
  6.  */
  7. private void readWriteLock() {
  8.     ExecutorService executor = Executors.newFixedThreadPool(2);
  9.     Map<String, String> map = new HashMap<>();
  10.     ReadWriteLock lock = new ReentrantReadWriteLock();
  11.     executor.submit(() -> {
  12.         lock.writeLock().lock();
  13.         try {
  14.             ConcurrentUtils.sleep(1);
  15.             map.put(“foo”“bar”);
  16.         } finally {
  17.             lock.writeLock().unlock();
  18.         }
  19.     });
  20.     Runnable readTask = () -> {
  21.         lock.readLock().lock();
  22.         try {
  23.             System.out.println(map.get(“foo”));
  24.             ConcurrentUtils.sleep(1);
  25.         } finally {
  26.             lock.readLock().unlock();
  27.         }
  28.     };
  29.     executor.submit(readTask);
  30.     executor.submit(readTask);
  31.     ConcurrentUtils.stop(executor);
  32.     // 两个读任务需要等待写任务完成。在释放了写锁之后,两个读任务会同时执行,并同时打印结果。
  33.     // 它们不需要相互等待完成,因为读锁可以安全同步获取
  34. }

StampedLock

Java 8 自带了一种新的锁,叫做StampedLock,它同样支持读写锁,就像上面的例子那样。与ReadWriteLock不同的是,StampedLock的锁方法会返回表示为long的标记。你可以使用这些标记来释放锁,或者检查锁是否有效。

  1. /**
  2.      * java8 StampedLock
  3.      *      tampedLock并没有实现重入特性,相同线程也要注意死锁
  4.      * @author wenqy
  5.      * @date 2020年1月18日 下午3:40:46
  6.      */
  7.     private void stampedLock() {
  8.         ExecutorService executor = Executors.newFixedThreadPool(2);
  9.         Map<String, String> map = new HashMap<>();
  10.         StampedLock lock = new StampedLock();
  11.         executor.submit(() -> {
  12.             long stamp = lock.writeLock(); // 读锁或写锁会返回一个标记
  13.             try {
  14.                 ConcurrentUtils.sleep(1);
  15.                 map.put(“foo”“bar”);
  16.             } finally {
  17.                 lock.unlockWrite(stamp);
  18.             }
  19.         });
  20.         Runnable readTask = () -> {
  21.             long stamp = lock.readLock();
  22.             try {
  23.                 System.out.println(map.get(“foo”));
  24.                 ConcurrentUtils.sleep(1);
  25.             } finally {
  26.                 lock.unlockRead(stamp);
  27.             }
  28.         };
  29.         executor.submit(readTask);
  30.         executor.submit(readTask);
  31.         ConcurrentUtils.stop(executor);
  32.     }

此外,StampedLock支持另一种叫做乐观锁(optimistic locking)的模式。乐观的读锁通过调用tryOptimisticRead()获取,它总是返回一个标记而不阻塞当前线程,无论锁是否真正可用。如果已经有写锁被拿到,返回的标记等于0。你需要总是通过lock.validate(stamp)检查标记是否有效。

  1. /**
  2.  * 乐观锁
  3.  *  乐观锁在刚刚拿到锁之后是有效的。和普通的读锁不同的是,乐观锁不阻止其他线程同时获取写锁。
  4.  *  在第一个线程暂停一秒之后,第二个线程拿到写锁而无需等待乐观的读锁被释放。
  5.  *  此时,乐观的读锁就不再有效了。甚至当写锁释放时,乐观的读锁还处于无效状态。
  6.  *  所以在使用乐观锁时,你需要每次在访问任何共享可变变量之后都要检查锁,来确保读锁仍然有效。
  7.  * 
  8.  * @author wenqy
  9.  * @date 2020年1月18日 下午3:49:31
  10.  */
  11. private void optimisticLock() {
  12.     System.out.println(“—–>optimisticLock—->”);
  13.     ExecutorService executor = Executors.newFixedThreadPool(2);
  14.     StampedLock lock = new StampedLock();
  15.     executor.submit(() -> {
  16.         long stamp = lock.tryOptimisticRead();
  17.         try {
  18.             System.out.println(“Optimistic Lock Valid: “ + lock.validate(stamp));
  19.             ConcurrentUtils.sleep(1);
  20.             System.out.println(“Optimistic Lock Valid: “ + lock.validate(stamp));
  21.             ConcurrentUtils.sleep(2);
  22.             System.out.println(“Optimistic Lock Valid: “ + lock.validate(stamp));
  23.         } finally {
  24.             lock.unlock(stamp);
  25.         }
  26.     });
  27.     executor.submit(() -> {
  28.         long stamp = lock.writeLock();
  29.         try {
  30.             System.out.println(“Write Lock acquired”);
  31.             ConcurrentUtils.sleep(2);
  32.         } finally {
  33.             lock.unlock(stamp);
  34.             System.out.println(“Write done”);
  35.         }
  36.     });
  37.     ConcurrentUtils.stop(executor);
  38. }
  39. /**
  40.  * 读锁转换为写锁
  41.  * 
  42.  * @author wenqy
  43.  * @date 2020年1月18日 下午4:00:10
  44.  */
  45. private void convertToWriteLock() {
  46.     count = 0;
  47.     System.out.println(“—–>convertToWriteLock—->”);
  48.     ExecutorService executor = Executors.newFixedThreadPool(2);
  49.     StampedLock lock = new StampedLock();
  50.     executor.submit(() -> {
  51.         long stamp = lock.readLock();
  52.         try {
  53.             if (count == 0) {
  54.                 stamp = lock.tryConvertToWriteLock(stamp); // 读锁转换为写锁而不用再次解锁和加锁
  55.                 if (stamp == 0L) { // 调用不会阻塞,但是可能会返回为零的标记,表示当前没有可用的写锁
  56.                     System.out.println(“Could not convert to write lock”);
  57.                     stamp = lock.writeLock(); // 阻塞当前线程,直到有可用的写锁
  58.                 }
  59.                 count = 23;
  60.             }
  61.             System.out.println(count);
  62.         } finally {
  63.             lock.unlock(stamp);
  64.         }
  65.     });
  66.     ConcurrentUtils.stop(executor);
  67. }

信号量

除了锁之外,并发API也支持计数的信号量。不过锁通常用于变量或资源的互斥访问,信号量可以维护整体的准入许可。

  1. /**
  2.      * 信号量
  3.      * 
  4.      * @author wenqy
  5.      * @date 2020年1月18日 下午4:13:11
  6.      */
  7.     private void doSemaphore() {
  8.         ExecutorService executor = Executors.newFixedThreadPool(10);
  9.         Semaphore semaphore = new Semaphore(5); // 并发访问总数
  10.         Runnable longRunningTask = () -> {
  11.             boolean permit = false;
  12.             try {
  13.                 permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
  14.                 if (permit) {
  15.                     System.out.println(“Semaphore acquired”);
  16.                     ConcurrentUtils.sleep(5);
  17.                 } else { // 等待超时之后,会向控制台打印不能获取信号量的结果
  18.                     System.out.println(“Could not acquire semaphore”);
  19.                 }
  20.             } catch (InterruptedException e) {
  21.                 throw new IllegalStateException(e);
  22.             } finally {
  23.                 if (permit) {
  24.                     semaphore.release();
  25.                 }
  26.             }
  27.         };
  28.         IntStream.range(010)
  29.             .forEach(i -> executor.submit(longRunningTask));
  30.         ConcurrentUtils.stop(executor);
  31.     }

原子变量

java.concurrent.atomic包包含了许多实用的类,用于执行原子操作。如果你能够在多线程中同时且安全地执行某个操作,而不需要synchronized关键字或锁,那么这个操作就是原子的。

本质上,原子操作严重依赖于比较与交换(CAS),它是由多数现代CPU直接支持的原子指令。这些指令通常比同步块要快。所以在只需要并发修改单个可变变量的情况下,我建议你优先使用原子类,而不是锁。可以看下java包提供的一些原子变量例子AtomicInteger、LongAdder 、LongAccumulator、concurrentHashMap等等。

  1. /**
  2.      * AtomicInteger
  3.      *      incrementAndGet
  4.      * @author wenqy
  5.      * @date 2020年1月18日 下午4:27:10
  6.      */
  7.     private void atomicIntegerIncre() {
  8.         AtomicInteger atomicInt = new AtomicInteger(0);
  9.         ExecutorService executor = Executors.newFixedThreadPool(2);
  10.         IntStream.range(01000)
  11.             .forEach(i -> executor.submit(atomicInt::incrementAndGet)); // ++
  12.         ConcurrentUtils.stop(executor);
  13.         System.out.println(atomicInt.get());    // => 1000
  14.     }
  15.     /**
  16.      * AtomicInteger
  17.      *  updateAndGet
  18.      * @author wenqy
  19.      * @date 2020年1月18日 下午4:29:26
  20.      */
  21.     private void atomicIntegerUpdateAndGet() {
  22.         AtomicInteger atomicInt = new AtomicInteger(0);
  23.         ExecutorService executor = Executors.newFixedThreadPool(2);
  24.         IntStream.range(01000)
  25.             .forEach(i -> {
  26.                 Runnable task = () ->
  27.                     atomicInt.updateAndGet(n -> n + 2); // 结果累加2
  28.                 executor.submit(task);
  29.             });
  30.         ConcurrentUtils.stop(executor);
  31.         System.out.println(atomicInt.get());    // => 2000
  32.     }
  33.     /**
  34.      * LongAdder
  35.      *      AtomicLong的替代,用于向某个数值连续添加值
  36.      *      内部维护一系列变量来减少线程之间的争用,而不是求和计算单一结果
  37.      *      当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。
  38.      *      这种情况在抓取统计数据时经常出现,例如,你希望统计Web服务器上请求的数量。
  39.      *      LongAdder缺点是较高的内存开销,因为它在内存中储存了一系列变量。
  40.      * @author wenqy
  41.      * @date 2020年1月18日 下午4:33:29
  42.      */
  43.     private void longAdder() {
  44.         LongAdder adder = new LongAdder();
  45.         ExecutorService executor = Executors.newFixedThreadPool(2);
  46.         IntStream.range(01000)
  47.             .forEach(i -> executor.submit(adder::increment));
  48.         ConcurrentUtils.stop(executor);
  49.         System.out.println(adder.sumThenReset());   // => 1000
  50.     }
  51.     /**
  52.      * LongAccumulator
  53.      *      LongAccumulator是LongAdder的更通用的版本
  54.      *      内部维护一系列变量来减少线程之间的争用
  55.      * @author wenqy
  56.      * @date 2020年1月18日 下午4:35:11
  57.      */
  58.     private void longAccumulator() {
  59.         LongBinaryOperator op = (x, y) -> 2 * x + y;
  60.         LongAccumulator accumulator = new LongAccumulator(op, 1L);
  61.         ExecutorService executor = Executors.newFixedThreadPool(2);
  62.         // i=0  2 * 1 + 0 = 2;
  63.         // i=2  2 * 2 + 2 = 6;
  64.         // i=3  2 * 6 + 3 = 15;
  65.         // i=4  2 * 15 + 4 = 34;
  66.         IntStream.range(010)
  67.             .forEach(i -> executor.submit(() -> {
  68.                         accumulator.accumulate(i);
  69.                         System.out.println(“i:” + i + ” result:” + accumulator.get());
  70.                     })
  71.                 );
  72.         // 初始值为1。每次调用accumulate(i)的时候,当前结果和值i都会作为参数传入lambda表达式。
  73.         ConcurrentUtils.stop(executor);
  74.         System.out.println(accumulator.getThenReset());     // => 2539
  75.     }
  76.     /**
  77.      * concurrentMap
  78.      * 
  79.      * @author wenqy
  80.      * @date 2020年1月18日 下午4:38:09
  81.      */
  82.     private void concurrentMap() {
  83.         System.out.println(“—–>concurrentMap—–>”);
  84.         ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
  85.         map.put(“foo”“bar”);
  86.         map.put(“han”“solo”);
  87.         map.put(“r2”“d2”);
  88.         map.put(“c3”“p0”);
  89.         map.forEach((key, value) -> System.out.printf(“%s = %s\n”, key, value));
  90.         String value = map.putIfAbsent(“c3”“p1”);
  91.         System.out.println(value);    // p0  提供的键不存在时,将新的值添加到映射
  92.         System.out.println(map.getOrDefault(“hi”“there”));    // there 传入的键不存在时,会返回默认值
  93.         map.replaceAll((key, val) -> “r2”.equals(key) ? “d3” : val);
  94.         System.out.println(map.get(“r2”));    // d3
  95.         map.compute(“foo”, (key, val) -> val + val);
  96.         System.out.println(map.get(“foo”));   // barbar 转换单个元素,而不是替换映射中的所有值
  97.         map.merge(“foo”“boo”, (oldVal, newVal) -> newVal + ” was “ + oldVal);
  98.         System.out.println(map.get(“foo”));   // boo was foo
  99.     }
  100.     /**
  101.      * concurrentHashMap
  102.      * 
  103.      * @author wenqy
  104.      * @date 2020年1月18日 下午4:38:42
  105.      */
  106.     private void concurrentHashMap() {
  107.         System.out.println(“—–>concurrentHashMap—–>”);
  108.         ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
  109.         map.put(“foo”“bar”);
  110.         map.put(“han”“solo”);
  111.         map.put(“r2”“d2”);
  112.         map.put(“c3”“p0”);
  113.         map.forEach(1, (key, value) ->
  114.         System.out.printf(“key: %s; value: %s; thread: %s\n”,
  115.             key, value, Thread.currentThread().getName())); // 可以并行迭代映射中的键值对
  116.         String result = map.search(1, (key, value) -> {
  117.             System.out.println(Thread.currentThread().getName());
  118.             if (“foo”.equals(key)) { // 当前的键值对返回一个非空的搜索结果
  119.                 return value; // 只要返回了非空的结果,就不会往下搜索了
  120.             }
  121.             return null;
  122.         }); // ConcurrentHashMap是无序的。搜索函数应该不依赖于映射实际的处理顺序
  123.         System.out.println(“Result: “ + result);
  124.         String searchResult = map.searchValues(1, value -> {
  125.             System.out.println(Thread.currentThread().getName());
  126.             if (value.length() > 3) {
  127.                 return value;
  128.             }
  129.             return null;
  130.         }); // 搜索映射中的值
  131.         System.out.println(“Result: “ + searchResult);
  132.         String reduceResult = map.reduce(1,
  133.             (key, value) -> {
  134.                 System.out.println(“Transform: “ + Thread.currentThread().getName());
  135.                 return key + “=” + value;
  136.             },
  137.             (s1, s2) -> {
  138.                 System.out.println(“Reduce: “ + Thread.currentThread().getName());
  139.                 return s1 + “, “ + s2;
  140.             });
  141.         // 第一个函数将每个键值对转换为任意类型的单一值。
  142.         // 第二个函数将所有这些转换后的值组合为单一结果,并忽略所有可能的null值
  143.         System.out.println(“Result: “ + reduceResult);
  144.     }

我们学了java8的一些新特性和并发编程例子,暂且告一段落了,demo已上传至github:https://github.com/wenqy/java-study

参考

https://github.com/winterbe/java8-tutorial java8教程

https://wizardforcel.gitbooks.io/modern-java/content/ 中文译站

https://github.com/wenqy/java-study 学习例子

发表评论

电子邮件地址不会被公开。 必填项已用*标注

5 + 17 = ?