参考: 《scala函数式编程》 tiger-xc博客

一、函数式编程介绍


1、什么是函数式编程

函数没有副作用

一个咖啡厅的设计

package cn.rectcircle.scala.fpbook.chapter01

//信用卡
class CreditCard

//咖啡
trait Coffee {
	val price:Double
}
//:)
class Java extends Coffee{
	override val price: Double = 1.0+1.1+1.2+1.3+1.4+5+6+7+8+9
}

//费用
case class Charge(cc:CreditCard, amount: Double){
	def combine(other: Charge):Charge =
		if (cc == other.cc) Charge(cc, amount+other.amount)
		else throw new Exception("不能合并不同信用卡的价格")
}

//咖啡店
object Cafe{
	def buyCoffee(cc: CreditCard):(Coffee, Charge) = {
		val cup = new Java
		(cup, Charge(cc, cup.price))
	}

	def buyCoffees(cc:CreditCard, n:Int):(List[Coffee], Charge) = {
		val purchases:List[(Coffee, Charge)] = List.fill(n)(buyCoffee(cc))
		val (coffees, charges) = purchases.unzip
		(coffees, charges.reduce((c1, c2)=>c1.combine(c2)))
	}
}

object CafeExample extends App {
	val cc = new CreditCard
	print(Cafe.buyCoffee(cc))
	print(Cafe.buyCoffees(cc, 4))
}

java版

package cn.rectcircle.java.fpbook.chapter01;


import cn.rectcircle.java.util.Tuple.Tuple2;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

//信用卡
class CreditCard{}

//咖啡
interface Coffee{
	Double price();
}

//:)
class Java implements Coffee{
	@Override
	public Double price() {
		return 1.0+1.1+1.2+1.3+1.4+5+6+7+8+9;
	}
}

//费用
class Charge {
	private final CreditCard cc;
	private final Double amount;


	public Charge(CreditCard cc, Double amount){
		this.cc = cc;
		this.amount = amount;
	}

	public Charge combine(Charge other) {//throws Exception {
		if(cc == other.cc) {
			return new Charge(cc, amount + other.amount);
		} else {
			return null;
//			throw new Exception("不能合并不同信用卡的价格");
		}
	}
}

//咖啡店
class Cafe {
	public static Tuple2<Coffee, Charge> buyCoffee(CreditCard cc) {
		Coffee cup = new Java();
		return Tuple2.of(cup, new Charge(cc, cup.price()));
	}

	public static Tuple2<List<Coffee>, Charge> buyCoffees(CreditCard cc, Integer n){
		final List<Tuple2<Coffee, Charge>> lists = Stream
				.of(new Integer[n])
				.map(i -> buyCoffee(cc))
				.collect(Collectors.toList());
		final List<Coffee> coffees = lists.stream().map(tuple2 -> tuple2._1).collect(Collectors.toList());
		final Stream<Charge> charges = lists.stream().map(tuple2 -> tuple2._2);
		return Tuple2.of(coffees, charges.reduce((c1, c2)-> c1.combine(c2)));
	}
}

public class CafeExample {

	public static void main(String[] args) {
		CreditCard cc = new CreditCard();
		System.out.println((Cafe.buyCoffee(cc)));
		System.out.println(Cafe.buyCoffees(cc, 4));
	}
}

2、在scala、java中使用函数式编程

几个例题

package cn.rectcircle.scala.fpbook.chapter02


object Exercise extends App {
	//柯里化
	def curry[A, B, C](f: (A, B) => C): A => (B => C) =
		(a: A) => (b: B) => f(a, b)

	//反柯里化
	def uncurry[A, B, C](f: A => B => C): (A, B) => C =
		(a: A, b: B) => f(a)(b)

	//组合
	def compose[A, B, C](f: B => C, g: A => B): A => C =
		(a: A) => f(g(a))

	def add(a:Int, b:Int) = a+b
	println(add(1,2))
	println(curry(add)(1)(2))
	println(uncurry(curry(add))(1,2))
}

java8 描述

package cn.rectcircle.java.fpbook.chapter02;

import scala.Function1;
import scala.Function2;
import static cn.rectcircle.java.fpbook.chapter02.FunctionalModule.*;

class FunctionalModule{

	//柯里化
	public static <A, B, C> Function1<A, Function1<B,C>> curry(Function2<A, B, C> f){
		 return (A a) -> (B b) -> f.apply(a,b);
	}

	//反柯里化
	public static <A, B, C> Function2<A, B, C> uncurry(Function1<A, Function1<B,C>> f){
		return (A a, B b) -> f.apply(a).apply(b);
	}

	//组合
	public static <A, B, C> Function1<A, C> compose(Function1<B, C> f, Function1<A, B> g){
		return (A a) -> f.apply(g.apply(a));
	}
}


public class Exercise {


	public static void main(String[] args) {

		Function2<Integer, Integer, Integer> add = (Integer a, Integer b) -> a+b;

		System.out.println((add.apply(1,2)));
		System.out.println(curry(add).apply(1).apply(2));
		System.out.println(uncurry(curry(add)).apply(1,2));
	}
}

3、函数式数据结构

(1)List的实现

package cn.rectcircle.scala.fpbook.chapter03

import scala.annotation.tailrec

sealed trait List[+A]
case object Nil extends List[Nothing]

case class Cons[+A](head:A, tail:List[A]) extends List[A]

object List{
	def sum(ints: List[Int]):Int = ints match {
		case Nil => 0
		case Cons(x,xs) => x+sum(xs)
	}

	def product(ds:List[Double]):Double = ds match {
		case Nil => 1.0
		case Cons(0.0, _) => 0.0
		case Cons(x, xs) => x * product(xs)
	}

	def apply[A](as:A*): List[A] =
		if (as.isEmpty) Nil
		else Cons(as.head, apply(as.tail:_*))

	//tail函数
	def tail[A](l:List[A]):List[A] = l match {
		case Cons(_, tl) => tl
		case Nil => throw new UnsupportedOperationException("tail of empty list")
	}

	//替换第一个元素
	def setHead[A](l:List[A], e:A):List[A] = l match {
		case Cons(_, tl) => Cons(e, tl)
		case Nil => throw new UnsupportedOperationException("tail of empty list")
	}

	//删除前n个元素
	def drop[A](l:List[A], n:Int):List[A] =
		if(n == 0) l
		else l match {
			case Nil => Nil
			case Cons(_, tl) => drop(tl, n-1)
		}

	//删除列表中前缀全部符合判定的元素
	def dropWhile[A](l:List[A])(f:A=>Boolean):List[A] =
		l match {
			case Cons(h,t) if f(h) => dropWhile(t)(f)
			case _ => l
		}


	def append[A](a1:List[A], a2:List[A]):List[A]=
		a1 match {
			case Nil => a2
			case Cons(h,t) => Cons(h, append(t, a2))
		}

	//返回除了最后一个元素的所有元素列表
	def init[A](l:List[A]):List[A] = l match {
		case Nil => throw new UnsupportedOperationException("tail of empty list")
		case Cons(_, Nil) => Nil
		case Cons(h, t) => Cons(h, init(t))
	}

	//右折叠
	def foldRight[A, B](as:List[A], z:B)(f:(A, B)=>B):B = as match {
		case Nil => z
		case Cons(h, t) => f(h, foldRight(t, z)(f))
	}

	def sum2(ns:List[Int]):Int =
		foldRight(ns, 0)((x, y)=> x+y)

	def product2(ns:List[Double]):Double =
		foldRight(ns, 1.0)(_*_)

	def length[A](as:List[A]):Int =
		foldRight(as, 0)((_, z)=> z+1)

	//左折叠
	@tailrec
	def foldLeft[A, B](as:List[A], z:B)(f:(B,A)=>B):B =as match {
		case Nil => z
		case Cons(h, t) => foldLeft(t, f(z, h))(f)
	}

	def sum3(l: List[Int]):Int = foldLeft(l, 0)(_ + _)
	def product3(l: List[Double]):Double = foldLeft(l, 1.0)(_ * _)

	def length2[A](l: List[A]): Int = foldLeft(l, 0)((acc,_) => acc + 1)

	//逆序
	def reverse[A](l:List[A]):List[A] =
		foldLeft(l, Nil:List[A])((z, a)=>Cons(a,z))

	//通过foldLeft实现foldRight
	def foldRightViaFoldLeft[A,B](l: List[A], z: B)(f: (A,B) => B): B =
		foldLeft(reverse(l), z)((b,a) => f(a,b))
	def foldRightViaFoldLeft_1[A,B](l: List[A], z: B)(f: (A,B) => B): B =
		foldLeft(l, (b:B) => b)((g,a) => b => g(f(a,b)))(z)
	//假设 l = List(1,2,3), f = (b, a) => a+b, z = 0
	//其中 g = b => b, f1 = (g, a) => b => g(f(a,b))
	/*
	foldLeft(List(1,2,3), b->b)(f1) (0)
	foldLeft(List(2,3), b->f(1, b))(f1) (0)
	foldLeft(List(3), b->f(1, f(2, b)) )(f1) (0)
	foldLeft(Nil, b->f(1, f(2,f(3, b))))(f1) (0)

	b-> f(1, f(2,f(3, b))) (0) =
	b-> f(1, f(2, 3+0)) =
	b-> f(1, 2+3) =
	b-> 1+2+3 = 6
	 */

	//lambda I(恒等)组合子
	def I[A](a :A):A = a

	def appendViaFoldRight[A](l: List[A], r: List[A]): List[A] =
		foldRight(l, r)(Cons(_,_))

	//多个列表展平
//	def concat[A](ls:List[A]*):List[A] =
//		ls.foldLeft(Nil:List[A])((z, l) => append(z, l))

	def concat[A](l: List[List[A]]): List[A] =
		foldRight(l, Nil:List[A])(append)

	def map[A,B](l:List[A])(f:A=>B):List[B] =
		foldRight(l, Nil:List[B])((a, z) => Cons(f(a), z))

	//删除不满足断言的元素
	def filter[A](l:List[A])(f: A=>Boolean):List[A] =
		foldRight(l, Nil:List[A])((a,z)=> if(f  (a)) Cons(a,z) else z)

	def flatMap[A, B](l:List[A])(f: A=>List[B]):List[B] =
		//foldRight(l, Nil:List[B])((a,z)=> append(f(a), z))
		concat(map(l)(f))

	def filterViaFlatMap[A](l:List[A])(f: A=>Boolean):List[A] =
		flatMap(l)(a => if(f(a)) List(a) else Nil)


	def zipWith[A,B,C](a: List[A], b: List[B])(f: (A,B) => C): List[C] = (a,b) match {
		case (Nil, _) => Nil
		case (_, Nil) => Nil
		case (Cons(h1,t1), Cons(h2,t2)) => Cons(f(h1,h2), zipWith(t1,t2)(f))
	}

	@annotation.tailrec
	def startsWith[A](l: List[A], prefix: List[A]): Boolean = (l,prefix) match {
		case (_,Nil) => true
		case (Cons(h,t),Cons(h2,t2)) if h == h2 => startsWith(t, t2)
		case _ => false
	}

	//检查sub序列是否式sup序列的子序列
	@annotation.tailrec
	def hasSubsequence[A](sup: List[A], sub: List[A]): Boolean = sup match {
		case Nil => sub == Nil
		case _ if startsWith(sup, sub) => true
		case Cons(_,t) => hasSubsequence(t, sub)
	}
}

object Test extends App{
	val l = List(1,2,3)
	println(l)

	println(List.sum(l))
	println(List.tail(l))
	println(List.setHead(l, 2))

	println(List.dropWhile(l)( x => x<2))

	println(
		List.foldRight(List(1,2,3,4), Nil:List[Int])(Cons(_,_))
	)

	println(List.length(l))
	println(List.foldLeft(l, 0)(_+_))
}

(2)Tree的实现

package cn.rectcircle.scala.fpbook.chapter03

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left:Tree[A], right:Tree[A]) extends Tree[A]

object Tree {
	def size[A](t:Tree[A]):Int = t match {
		case Leaf(_) => 1
		case Branch(l, r) => 1 + size(l) + size(r)
	}

	def maximum(t:Tree[Int]):Int = t match {
		case Leaf(v) => v
		case Branch(l, r) => maximum(l) max maximum(r)
	}

	def depth[A](t:Tree[A]):Int = t match {
		case Leaf(_) => 1
		case Branch(l, r) => (1+depth(l)) max (1 + depth(r))
	}

	def map[A, B](t:Tree[A])(f:A=>B):Tree[B] = t match {
		case Leaf(v) => Leaf(f(v))
		case Branch(l,r) => Branch(map(l)(f), map(r)(f))
	}

	//错误的泛化
	def fold1[A, B](t:Tree[A], z:B)(f:(A, B)=>B):B = t match {
		case Leaf(v) => f(v, z)
		case Branch(l, r) => fold1(l, fold1(r, z)(f))(f)
	}

	def fold[A,B](t: Tree[A])(f: A => B)(g: (B,B) => B): B = t match {
		case Leaf(a) => f(a)
		case Branch(l,r) => g(fold(l)(f)(g), fold(r)(f)(g))
	}

	def sizeViaFold[A](t: Tree[A]): Int =
		fold(t)(_ => 1)(1 + _ + _)

	def maximumViaFold(t: Tree[Int]): Int =
		fold(t)(a => a)(_ max _)

	def depthViaFold[A](t: Tree[A]): Int =
		fold(t)(_ => 0)((d1,d2) => 1 + (d1 max d2))

	def mapViaFold[A,B](t: Tree[A])(f: A => B): Tree[B] =
		fold(t)(a => Leaf(f(a)): Tree[B])(Branch(_,_))

}

object TreeTest extends App{
	val t = Branch(Branch(Leaf(1),Leaf(2)),Leaf(3))
	println(Tree.fold1(t, 0)(_+_))

}

4、不使用异常来处理错误

(1)Option

package cn.rectcircle.scala.fpbook.chapter04

sealed trait Option[+A] {
	def map[B](f:A=>B):Option[B] = this match {
		case Some(get) => Some(f(get))
		case None => None
	}

	def getOrElse[B>:A](default: => B):B = this match {
		case Some(get) => get
		case None => default
	}

	def flatMap[B](f:A=>Option[B]):Option[B] = map(f).getOrElse(None)

	def orElse [B>:A](ob: => Option[B]):Option[B] = map(Some(_)) getOrElse(None)

	def orElse_1[B>:A](ob: => Option[B]):Option[B] = this match {
		case None => ob
		case _ => _
	}

	def filter(f:A=>Boolean):Option[A] = this match {
		case Some(get) => if(f(get)) this else None
		case _ => _
	}
}

case class Some[+A](get:A) extends Option[A]
case object None extends Option[Nothing]

object Option{
	//升格函数
	def lift[A, B](f:A=>B):Option[A] => Option[B] = _ map(f)

	//将异常转换为None
	def Try[A](a: => A):Option[A] =
		try Some(a)
		catch {case e: Exception => None}

	def map2[A,B,C](a:Option[A], b:Option[B])(f:(A,B)=>C):Option[C] =
		a.flatMap(aa => b map(bb=>f(aa,bb)))

	def map2_1[A,B,C](a:Option[A], b:Option[B])(f:(A,B)=>C):Option[C] =
		for (aa <- a; bb <- b) f(aa,bb)


	def sequence[A](a:List[Option[A]]):Option[List[A]] = a match {
		case Nil => Some(Nil)
		case h :: t => h.flatMap( hh=> sequence(t).map(tt => hh::tt) )
	}

	def sequence_1[A](a:List[Option[A]]):Option[List[A]] =
		a.foldRight[Option[List[A]]](Some(Nil))((x, y) => map2(x, y)(_::_))

	def traverse[A, B](a: List[A])(f: A => Option[B]): Option[List[B]] =
		a match {
			case Nil => Some(Nil)
			case h::t => map2(f(h), traverse(t)(f))(_ :: _)
		}


	def traverse_1[A, B](a: List[A])(f: A => Option[B]): Option[List[B]] =
		a.foldRight[Option[List[B]]](Some(Nil))((h,t) => map2(f(h),t)(_ :: _))

	def sequenceViaTraverse[A](a: List[Option[A]]): Option[List[A]] =
		traverse(a)(x => x)

}

object OptionTest extends App{

	case class Employee(name:String, department:String)

	def lookupByName(name:String):Option[Employee] = Some(Employee(name,"经理"))

	val joeDepartment:Option[String] =
		lookupByName("Joe").map(_.department)


	def mean(xs: Seq[Double]): Option[Double] =
		if (xs.isEmpty) None
		else Some(xs.sum / xs.length)

	def variance(xs: Seq[Double]): Option[Double] =
		mean(xs) flatMap (m => mean(xs.map(x => math.pow(x - m, 2))))

	val abs0 = Option.lift(math.abs)
}

(2)Either

package cn.rectcircle.scala.fpbook.chapter04

sealed trait Either[+E, +A] {
	def map[B](f:A=> B):Either[E,B] = this match {
		case Left(v) => Left(v)
		case Right(v) => Right(f(v))
	}

	def flatMap[EE >: E, B](f:A=> Either[EE, B]): Either[EE, B] = this match {
		case Left(v) => Left(v)
		case Right(v) => f(v)
	}

	def orElse[EE >: E, B >: A](b: =>Either[EE, B]):Either[EE, B] = this match {
		case Left(_) => b
		case _ => _
	}

	def map2[EE >: E, B, C](b:Either[EE, B])(f:(A,B)=>C):Either[EE, C] =
		for (a <- this; aa <- b) yield f(a, aa)
}

case class Left[+E](value:E) extends Either[E, Nothing]
case class Right[+A](value:A) extends Either[Nothing, A]

object Either {
	def Try[A](a: => A): Either[Exception, A] =
		try Right(a)
		catch { case e: Exception => Left(e) }

	def sequence[E,A](es:List[Either[E,A]]):Either[E, List[A]] = es match {
		case Nil => Right(Nil)
		case h::t => h.flatMap(a => sequence(t).map(l => a::l))
//			for{
//				a <- h
//				l <- sequence(t)
//			} yield a :: l
	}

	def traverse[E, A, B](as:List[A])(f:A=>Either[E,B]):Either[E, List[B]] = as match {
		case Nil => Right(Nil)
		case h::t => // f(h).flatMap(b => traverse(t)(f).map(l => b::l) )
				for{
					b <- f(h)
					l <- traverse(t)(f)
				} yield b :: l
	}

	def traverse_1[E,A,B](es: List[A])(f: A => Either[E, B]): Either[E, List[B]] =
		es match {
			case Nil => Right(Nil)
			case h::t => (f(h) map2 traverse(t)(f))(_ :: _)
		}

	def traverse_2[E,A,B](es: List[A])(f: A => Either[E, B]): Either[E, List[B]] =
		es.foldRight[Either[E,List[B]]](Right(Nil))((a, b) => f(a).map2(b)(_ :: _))

	def sequence_1[E,A](es: List[Either[E,A]]): Either[E,List[A]] =
		traverse(es)(x => x)

}

object EitherTest extends App{
	def mean(xs: IndexedSeq[Double]):Either[String, Double] = {
		if(xs.isEmpty) Left("这是个空列表")
		else Right(xs.sum / xs.length)
	}

}

5、严格求值和惰性求值

(1)scala版本

package cn.rectcircle.scala.fpbook.chapter05

import Stream._

sealed trait Stream[+A]{
	def toList:List[A] = this match {
		case Empty => Nil
		case Cons(h, t) => h() :: t().toList
	}

	def toList_1:List[A] = {
		@annotation.tailrec
		def go(s:Stream[A], acc:List[A]):List[A] = s match {
			case Cons(h,t) => go(t(), h() :: acc)
			case _ => acc
		}
		go(this, List())
	}

	def toListFast: List[A] = {
		val buf = new collection.mutable.ListBuffer[A]
		@annotation.tailrec
		def go(s: Stream[A]): List[A] = s match {
			case Cons(h,t) =>
				buf += h()
				go(t())
			case _ => buf.toList
		}
		go(this)
	}

	def take(n: Int): Stream[A] = if(n==0) Empty else this match {
		case Empty => empty
		case Cons(h, t) => cons(h(), t().take(n-1))
	}

	def drop(n:Int):Stream[A] = this match {
		case Cons(_, t) if n>0 => t().drop(n-1)
		case _ => empty
	}

	def takeWhile(f:A=>Boolean):Stream[A] = this match {
		case Cons(h, t) if f(h()) => cons(h(), t().takeWhile(f))
		case _ => empty
	}

	def exists(p:A=>Boolean):Boolean = this match {
		case Cons(h, t) => p(h()) || t().exists(p)
		case _ => false
	}

	def foldRight[B] (z: => B)(f:(A, =>B) => B):B  =
		this match {
			case Cons(h, t) => f(h(), t().foldRight(z)(f))
			case Empty => z
		}

	def exists_1(p:A=>Boolean):Boolean =
		foldRight(false)((a , b)=> p(a) || b )

	def forAll(p:A=>Boolean):Boolean =
		foldRight(true)((a, b)=> p(a) && b)

	def takeWhile_1(f:A=>Boolean):Stream[A] =
		foldRight(empty[A])((a, b) => if(f(a)) cons(a, b.takeWhile(f)) else empty[A])

	def headOption():Option[A] =
		foldRight(None:Option[A])((a, _) => Some(a))

	def map[B](f: A=>B):Stream[B] =
		foldRight(empty[B])((a, b)=> cons(f(a), b))

	def filter(f: A => Boolean): Stream[A] =
		foldRight(empty[A])((h,t) =>
			if (f(h)) cons(h, t)
			else t)

	def append[B>:A](s: => Stream[B]): Stream[B] =
		foldRight(s)((h,t) => cons(h,t))

	def flatMap[B](f: A => Stream[B]): Stream[B] =
		foldRight(empty[B])((h,t) => f(h) append t)

	def mapViaUnfold[B](f: A => B): Stream[B] =
		unfold(this) {
			case Cons(h,t) => Some((f(h()), t()))
			case _ => None
		}

	def takeViaUnfold(n: Int): Stream[A] =
		unfold((this, n)) {
			case (Cons(h,_), 1) => Some((h(), (empty, 0)))
			case (Cons(h,t), n1) if n > 1 => Some((h(), (t(), n1-1)))
			case _ => None
		}
	def takeWhileViaUnfold(f: A => Boolean): Stream[A] =
		unfold(this) {
			case Cons(h,t) if f(h()) => Some((h(), t()))
			case _ => None
		}

	def zipWith[B,C](s2: Stream[B])(f: (A,B) => C): Stream[C] =
		unfold((this, s2)) {
			case (Cons(h1,t1), Cons(h2,t2)) =>
				Some((f(h1(), h2()), (t1(), t2())))
			case _ => None
		}

	// special case of `zipWith`
	def zip[B](s2: Stream[B]): Stream[(A,B)] =
		zipWith(s2)((_,_))


	def zipAll[B](s2: Stream[B]): Stream[(Option[A],Option[B])] =
		zipWithAll(s2)((_,_))

	def zipWithAll[B, C](s2: Stream[B])(f: (Option[A], Option[B]) => C): Stream[C] =
		Stream.unfold((this, s2)) {
			case (Empty, Empty) => None
			case (Cons(h, t), Empty) => Some(f(Some(h()), Option.empty[B]) -> (t(), empty[B]))
			case (Empty, Cons(h, t)) => Some(f(Option.empty[A], Some(h())) -> (empty[A] -> t()))
			case (Cons(h1, t1), Cons(h2, t2)) => Some(f(Some(h1()), Some(h2())) -> (t1() -> t2()))
		}
}

case object Empty extends Stream[Nothing]
case class Cons[+A](h: ()=>A, t: ()=> Stream[A]) extends Stream[A]



object Stream {
	def cons[A](hd: => A, tl: => Stream[A]):Stream[A] = {
		lazy val head = hd
		lazy val tail = tl
		Cons(()=>head, ()=>tail)
	}

	def empty[A]:Stream[A] = Empty

	def apply[A](as : A*):Stream[A] = {
		if(as.isEmpty) empty else cons(as.head, apply(as.tail:_*))
	}

	val ones:Stream[Int] = cons(1, ones)

	def constant[A](a: A):Stream[A] = cons(a, constant(a))

	def from(n:Int):Stream[Int] = cons(n, from(n+1))

	def unfold[A,S](z:S)(f:S=>Option[(A,S)]):Stream[A] = f(z) match {
		case Some((a, s)) => cons(a, unfold(s)(f))
		case None => empty[A]
	}

	def ones_1:Stream[Int] = unfold(Unit)(_=>Some(1, Unit))

	def from_1(n:Int):Stream[Int] =
		unfold(Unit)(_=> Some(n+1, Unit))

	def constant_1[A](a: A):Stream[A] =
		unfold(Unit)(_=> Some(a, Unit))


}

object StreamTest extends App{

	def maybeTwice(b:Boolean, i: => Int) = if(b) i+i else 0
	maybeTwice(true, {println("hi"); 1})
	println("================")

	def maybeTwice2(b:Boolean, i: => Int) = {
		lazy val j = i
		if(b) j+j else 0
	}
	maybeTwice2(true, {println("hi"); 1})
	println("================")

	println(Stream(1,2,3).take(2).toList)
	println("================")

	val s =
		cons(
			{println("这是1");1},
			cons({println("这是2");2},
				cons({println("这是3");3},
					cons({println("这是4");4}, empty))))
	println("s构造完成")
	val s1 = s.map(_+10)
	println("map完成")
	val s2 = s1.filter(_%2==0)
	println("filter完成")
	println(s2.toList)
	println("原生实现如下================")

	val ss = scala.Stream.fill(4){println("这是1"); 1}
	println("s构造完成")
	val ss1 = ss.map(_+10)
	println("map完成")
	val ss2 = ss1.filter(_%2==0)
	println("filter完成")
	println(ss2.toList)

	println("================")
	println(ones.take(5).toList)
	println(ones.map(_+1).exists(_ % 2 == 0))
	println(ones.takeWhile(_ == 1))
	println(ones.takeWhile(_ == 0))
	println(ones.forAll(_ != 1))

	def fibs:Stream[Int] = {
		def go(a:Int, b:Int):Stream[Int] = cons(b, go(b, a+b))
		cons(0, go(0, 1))
	}

	println("================")
	println(fibs.take(10).toList)

	def fibs_1:Stream[Int] =
		unfold((0, 1)){
			case (f1, f2) => Some(f1, (f2, f1+f2))
		}

	println("================")
	println(ones.takeViaUnfold(5).toList)

}

(2)java版本

package cn.rectcircle.java.fpbook.chapter05;

import cn.rectcircle.java.fpbook.base.Tuple;
import cn.rectcircle.java.fpbook.base.Tuple2;
import cn.rectcircle.java.fpbook.base.test.Lazy;
import cn.rectcircle.java.fpbook.chapter04.Option;
import cn.rectcircle.java.fpbook.chapter04.Some;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static cn.rectcircle.java.fpbook.base.Predef.lazy;

/**
 * @author Rectcircle
 * @date 17-12-11
 */
public interface Stream<A> {
	Stream empty = Empty.instance();

	boolean isEmpty();
	Supplier<A> head();
	Supplier<Stream<A>> tail();

	default List<A> toList(){
		if(isEmpty()){
			return Collections.emptyList();
		} else {
			List<A> as = new LinkedList<>();
			as.add(head().get());
			as.addAll(tail().get().toList());
			return as;
		}
	}

	default Stream<A> take(int n){
		if(n == 0 || isEmpty()) {
			return empty();
		}
		return cons(head(), () -> tail().get().take(n-1) );
	}

	default Stream<A> drop(int n){
		if(n > 0){
			return tail().get().drop(n-1);
		}
		return this;
	}

	default Stream<A> takeWhile(Function<A, Boolean> f){
		if(f.apply(head().get())){
			return cons(head(), ()->tail().get().takeWhile(f));
		}
		return empty();
	}

//	default <B> B foldRight(Supplier<B> b, BiFunction<A, B, B> f){
//		if(isEmpty()){
//			return b.get();
//		}
//		return f.apply(
//				head().get(),
//				tail().get().foldRight(b, f)
//		);
//	}

	default <B> Supplier<B> _foldRight(
			Supplier<B> b,
			BiFunction<A, Supplier<Supplier<B>>, Supplier<B>> f){
		if(isEmpty()){
			return b;
		}
		return f.apply(
				head().get(),
				()->tail().get()._foldRight(b, f)
		);
	}

	default <B> B foldRight(
			B b,
			BiFunction<A, Supplier<B>, B> f){
		BiFunction<A, Supplier<Supplier<B>>, Supplier<B>> f1 =
				(a, ssb) -> () -> f.apply(a, ssb.get());
		return _foldRight(()-> b, f1).get();
	}

	default boolean exists(Function<A, Boolean> f){
		return foldRight(
				false,
				(a, b1)  -> f.apply(a) || b1.get());
	}


	default boolean forAll(Function<A, Boolean> f) {
		return foldRight(
				true,
				(a, b)  -> f.apply(a) && b.get());
	}

	default Option<A> headOption(){
		Option<A> n = Option.None();
		return foldRight(n, (a,b) -> Option.Some(a));
	}

	default <B> Stream<B> map(Function<A, B> f){
		Stream<B> e= empty();
		BiFunction<A, Supplier<Stream<B>>, Stream<B>> f1 =
				(a, b) -> cons(()-> f.apply(a),  b);
		return foldRight(e, f1);
	}

	default Stream<A> filter(Function<A, Boolean> f) {
		Stream<A> e= empty();
		return foldRight(
				e,
				(a, b) -> {
					if (f.apply(a)) {
						return cons(() -> a, b);
					} else {
						return b.get();
					}
				});
	}

//	default Stream<A> append(Supplier<Stream<A>> s) {
//		return _foldRight(
//				s,
//				(h, t) -> () -> cons(()->h, ()->t.get().get())).get();
//	}

	default Stream<A> append(Supplier<Stream<A>> s) {
		return foldRight(
				s.get(),
				(h, t) -> cons(()->h, t));
	}



	default <B> Stream<B> mapViaUnfold(Function<A, B> f){
		return unfold(this, s ->{
			if(!s.isEmpty()){
				return Option.Some(Tuple.of(f.apply(s.head().get()), tail().get()));
			}
			return Option.None();
		});
	}

	static <A> Stream<A> cons(Supplier<A> hd, Supplier<Stream<A>> tl){
		Lazy<A> head = lazy(hd);
		Lazy<Stream<A>> tail = lazy(tl);
		return new Cons<>(head::get, tail::get);
	}

	@SuppressWarnings({ "unchecked"})
	static <A> Stream<A> empty(){
		return (Stream<A>)empty;
	}

	static <A> Stream<A> apply(A[] as, int i){
		if(i == as.length){
			return empty();
		} else {
			return cons(()->as[i],()-> apply(as, i+1));
		}
	}

	@SafeVarargs
	static <A> Stream<A> apply(A ...as){
		return apply(as, 0);
	}

	static Stream<Integer> ones(){
		return cons(()->1, Stream::ones);
	}

	static <A> Stream<A> constant(A a) {
		return cons(() -> a, ()->constant(a));
	}

	static Stream<Integer> from(Integer n){
		return cons(() -> n, ()->from(n+1));
	}

	static <A, S> Stream<A>  unfold(S z, Function<S, Option<Tuple2<A, S>>> f) {
		Option<Tuple2<A, S>> s = f.apply(z);
		if(s.isEmpty()){
			return empty();
		} else {
			Tuple2<A, S> t = s.get();
			return cons(()->t._1, ()->unfold(t._2,f));
		}
	}


	static Stream<Integer> ones_1 (){
		return unfold(null, v -> Option.Some(Tuple.of(1, null)));
	}

	static Stream<Integer> fibs_1(){
		return unfold(
				Tuple.of(0, 1),
				s -> Option.Some(Tuple.of(s._1, Tuple.of(s._2, s._1+s._2)))
		);
	}

	static Stream<Integer> from_1 (Integer n){
		return unfold(null, v -> Option.Some(Tuple.of(n+1, null)));
	}

	static Stream<Integer> constant_1 (Integer n){
		return unfold(null, v -> Option.Some(Tuple.of(n, null)));
	}


}

class Empty implements Stream{
	private static final Stream INSTANCE = new Empty();
	private Empty(){}
	static Stream instance() {
		return INSTANCE;
	}

	@Override
	public String toString(){
		return "Empty";
	}

	@Override
	public boolean isEmpty() {
		return true;
	}

	@Override
	public Supplier head() {
		throw new NoSuchElementException("head of empty stream");
	}

	@Override
	public Supplier tail() {
		throw new UnsupportedOperationException("tail of empty stream");
	}
}

class Cons<A> implements Stream<A> {
	private final Supplier<A> head;
	private final Supplier<Stream<A>> tail;
	Cons(Supplier<A> head, Supplier<Stream<A>> tail){
		this.head = head;
		this.tail = tail;
	}

	@Override
	public String toString() {
		return "Stream("+ head.get() +", ?)";
	}

	@Override
	public boolean isEmpty() {
		return false;
	}

	@Override
	public Supplier<A> head() {
		return head;
	}

	@Override
	public Supplier<Stream<A>> tail() {
		return tail;
	}
}

6、纯函数式状态

import State._

case class State[S, +A](run: S => (A, S)) {
	def map[B](f:A=>B):State[S,B] =
		State(s => {
			val (a, s1) = run(s)
			(f(a), s1)
		})

	def map2[B,C](sb:State[S, B])(f:(A,B)=>C):State[S,C]=
		for{
			a <- this
			b <- sb
		} yield f(a, b)

	def flatMap[B](f: A => State[S, B]): State[S, B] =
		State(s => {
			val (a, s1) = run(s)
			f(a).run(s1)
		})
}

object State {
	def unit[S, A](a: A): State[S, A] =
		State(s => (a, s))

	def sequenceViaFoldRight[S,A](sas: List[State[S, A]]): State[S, List[A]] =
		sas.foldRight(unit[S, List[A]](List()))((f, acc) => f.map2(acc)(_ :: _))

	def sequence[S, A](sas: List[State[S, A]]): State[S, List[A]] = {
		def go(s: S, actions: List[State[S,A]], acc: List[A]): (List[A],S) =
			actions match {
				case Nil => (acc.reverse,s)
				case h :: t => h.run(s) match { case (a,s2) => go(s2, t, a :: acc) }
			}
		State((s: S) => go(s,sas,List()))
	}

	def get[S]: State[S, S] = State(s => (s, s))

	def set[S](s: S): State[S, Unit] = State(_ => ((), s))

	def modify[S](f: S => S): State[S, Unit] = for {
		s <- get // Gets the current state and assigns it to `s`.
		_ <- set(f(s)) // Sets the new state to `f` applied to `s`.
	} yield ()
}


/*
使用样例:模拟一个自动糖果贩售机逻辑

贩售机有两种操作方法:投入硬币和扭动出糖旋钮。

贩售机可以处于锁定和放开两种状态。
模拟运作跟踪贩售机内当前的糖果和硬币数量。
贩售机的操作逻辑要求如下:

1、如果机内有糖的话,投入硬币贩售机从锁定状态进入放开状态

2、在放开状态下扭动旋钮贩售机放出一块糖果后自动进入锁定状态

3、在锁定状态下扭动旋钮贩售机不做反应

4、在放开状态下投入硬币贩售机不做反应

5、没有糖果的贩售机对任何操作都不做反应
*/
sealed trait Input
case object Coin extends Input
case object Turn extends Input

case class Machine(locked: Boolean, candies: Int, coins: Int)

object Candy {
	def update:Input=>(Machine=>Machine) = (i: Input) => (s: Machine) =>
		(i, s) match {
			case (_, Machine(_, 0, _)) => s
			case (Coin, Machine(false, _, _)) => s
			case (Turn, Machine(true, _, _)) => s
			case (Coin, Machine(true, candy, coin)) =>
				Machine(false, candy, coin + 1)
			case (Turn, Machine(false, candy, coin)) =>
				Machine(true, candy - 1, coin)
		}

	def simulateMachine(inputs: List[Input]): State[Machine, (Int, Int)] = for {
		_ <- sequence(inputs map (modify[Machine] _ compose update))
		s <- get
	} yield (s.coins, s.candies)
}

7、纯函数式的并行计算

(1)阻塞式的并行计算

package cn.rectcircle.scala.fpbook.chapter07

import java.util.concurrent.{ExecutorService, Executors, Future, TimeUnit}

import Par._

object Par {
	type Par[A] = ExecutorService => Future[A]

	//自定义java.util.concurrent.Future的实现
	private case class UnitFuture[A](get: A) extends Future[A] {
		override def isDone: Boolean = true
		override def get(timeout: Long, unit: TimeUnit): A = get
		override def isCancelled: Boolean = false
		override def cancel(mayInterruptIfRunning: Boolean): Boolean = false
	}

	//将一个恒定值转换为并行运算,直接在调用时在main执行,
	//不需要执行上下文
	def unit[A](a: A):Par[A] = (_:ExecutorService) => UnitFuture(a)

	//合并两个并行运算为一个新的并行运算
	//没有在另一个线程中调用f。默认是在主线程中调用
	//如果想在另外的线程执行f,可以使用fork(map2(a, b)(f))
	def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] =
		(es:ExecutorService) => {
			val af = a(es)
			val bf = b(es)
			UnitFuture(f(af.get, bf.get))
		}

	//支持超时的map2
	def map2_Timeout[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] =
		es => new Future[C] {
			import TimeUnit.NANOSECONDS
			val fa:Future[A] = run(a)(es) //在这里按pa的定义来确定在那个线程运行。如果pa是fork Par则在非主线程中运行
			val fb:Future[B] = run(b)(es)
			def get = f(fa.get, fb.get)
			def get(timeOut: Long, timeUnit: TimeUnit):C = {
				val start = System.nanoTime
				val a = fa.get
				val end = System.nanoTime
				//fa.get用去了一些时间。剩下给fb.get的timeout值要减去
				val b = fb.get(timeOut - timeUnit.convert(end - start, NANOSECONDS), timeUnit)
				f(a, b)
			}
			def isDone:Boolean = fa.isDone && fb.isDone
			def isCancelled:Boolean = fa.isCancelled && fb.isCancelled
			def cancel(evenIsRunning: Boolean):Boolean = fa.cancel(evenIsRunning) || fb.cancel(evenIsRunning)
		}



	//代表要并发的计算,实际上不会进行求值,直到run被调用
	//存在问题——本来需要在
	def fork[A](a: =>Par[A]):Par[A] =
		es => es.submit(() => a(es).get())

	//延迟计算
	def delay[A](fa: =>Par[A]):Par[A] =
		es => fa(es)

//	def get[A](a:Par[A]):A

	//包装一个标记为并发的未求值计算
	def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

	//包装一个函数f:A=>B成一个异步函数
	def asyncF[A,B](f: A => B): A => Par[B] =
		a => lazyUnit(f(a))

	//从实际执行的计算中获取结果值
	def run[A](a:Par[A])(implicit s:ExecutorService): Future[A]
		= a(s)

	//有缺陷的算法
	//unit 存在缺陷,存在副作用
//	def sum(ints:IndexedSeq[Int]):Int =
//		if(ints.size<=1) ints.headOption getOrElse 0
//		else{
//			val (l, r) =ints.splitAt(ints.size/2)
//
//			val (sumL, sumR) = (Par.unit(sum(l)), Par.unit(sum(r)))
//			Par.get(sumL) + Par.get(sumR)
//			//以上两句,进行内联
//			//Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))
//			//+号两边不可以并行
//		}
//
//	//返回一个Par[Int]
//	def sum(ints:IndexedSeq[Int]):Par[Int] =
//		if(ints.size<=1) unit(ints.headOption getOrElse 0)
//		else{
//			val (l, r) =ints.splitAt(ints.size/2)
//			Par.map2(sum(l), sum(r))(_+_)
//		}
//

	//支持中缀写法如:
	// p.map2(p2)(f)
	implicit class ParOps[A](p: Par[A]) {
		//在此添加方法
		def map2[B,C](b: Par[B])(f: (A,B) => C): Par[C] =
			Par.map2(p, b)(f)
	}

	//对列表排序
	def sortPar(parList:Par[List[Int]]):Par[List[Int]] =
		map2(parList, unit(()))((a, _)=>a.sorted)

	def map[A, B](pa:Par[A])(f:A=>B):Par[B] =
		map2(pa, unit(()))((a, _)=> f(a))

	def sortPar_1(parList:Par[List[Int]]):Par[List[Int]] =
		map(parList)(a=>a.sorted)

	//将整个列表并行化
	def parMap[A,B](ps:List[A])(f:A=>B):Par[List[B]] = fork{
		val fbs:List[Par[B]] = ps.map(asyncF(f))
		sequence(fbs)
	}

	def sequence[A](ps:List[Par[A]]):Par[List[A]] =
		ps.foldRight(unit(List[A]()))((p, acc)=> map2(p, acc)(_::_))

	def parFilter[A](as:List[A])(f:A=>Boolean):Par[List[A]] = {
		val pars: List[Par[List[A]]] =
			as map asyncF((a: A) => if (f(a)) List(a) else List())
		map(sequence(pars))(_.flatten)
	}

	//测试api的代数性质
	def equal[A](p: Par[A], p2: Par[A])(implicit e: ExecutorService): Boolean =
		p(e).get == p2(e).get

}




object ParTest extends App{
	//求和
	def sum(ints:Seq[Int]):Int =
		ints.foldLeft(0)((a,b)=>a+b)

	//二分法求和
	def sum_1(ints:IndexedSeq[Int]):Int =
		if(ints.size<=1) ints.headOption getOrElse 0
		else{
			val (l, r) =ints.splitAt(ints.size/2)
			sum_1(l) + sum_1(r)
		}

	//测试
	implicit val es:ExecutorService = Executors.newCachedThreadPool()
	//立即输出main
	val p = unit{
		println(Thread.currentThread().getName)
		1
	}
	//输出并行计算结果:1
	println(run(p).get())
	//先输出线程名pool-1-thread-1
	//再输出结果:1
	println(
		run(fork{
			println(Thread.currentThread().getName)
			p
		}).get()
	)

	//没有输出
	val p1 = lazyUnit{
		println(Thread.currentThread().getName)
		2
	}
	//输出main===1
	println("main===1")
	//输出:
	//pool-1-thread-1
	//2
	println(
		run(p1).get
	)

	def ascFunctionTest(a:Int):Int = {
		println(Thread.currentThread().getName)
		a+1
	}
	println("main===2")
	val p2 = asyncF(ascFunctionTest)
	println(run(p2(1)).get())
	println("main===3")
	println(run(p2(2)).get())
	println("main===4")
	println(run(p2(3)).get())

	println("main===5")
	val p3 = map2(p, p1)((a, b)=>{
		println(Thread.currentThread().getName)
		a+b
	})
	println(run(p3).get)


	println(run(sortPar(unit(List(3,2,1)))))
	println(run(sortPar_1(unit(List(3,2,1)))))

	val p4 = parMap(List(1,2,3))(a => {
		println(Thread.currentThread().getName)
		a + 1
	})
	println(p4)
	println(run(p4).get)

	//fork的问题
	val a = lazyUnit(42+1)
	val S = Executors.newFixedThreadPool(1)
	println(equal(a, fork(a))(S)) //出现死锁,永远无法结束

	es.shutdown()
}

(2)Actor的实现

package cn.rectcircle.scala.fpbook.chapter07

import java.util.concurrent.{Callable, ExecutorService, Executors}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import scala.annotation.tailrec

final class Actor[A](strategy: Strategy)
                    (handler: A => Unit,
                     onError: Throwable => Unit = throw _) { self =>
  	//消息队列队列尾指针
	private val tail = new AtomicReference(new Node[A]())
	//条件变量,当为0的时候表示已经在执行调度程序
	private val suspended = new AtomicInteger(1)
	//指向队列头指针,用于push
	private val head = new AtomicReference(tail.get)

	/** `apply`的别名 */
	def !(a: A) {
		val n = new Node(a)
		head.getAndSet(n).lazySet(n)
		trySchedule()
	}

	/** 发送消息a给这个actor的消息信箱(消息队列) */
	def apply(a: A) {
		this ! a
	}

	def contramap[B](f: B => A): Actor[B] =
		new Actor[B](strategy)((b: B) => this ! f(b), onError)

	//尝试调度
	private def trySchedule() {
		if (suspended.compareAndSet(1, 0)) schedule()
	}

	//开始调度
	private def schedule() {
		strategy(act()) //立即返回,不会阻塞,开始一个异步过程
	}

	//子线程中执行的主逻辑
	private def act() {
		val t = tail.get
		val n = batchHandle(t, 1024)
		if (n ne t) { //还有需要处理的消息了
			n.a = null.asInstanceOf[A]
			tail.lazySet(n)
			schedule()
		} else { //没有需要处理的消息了
			//在此可能又有消息来了
			suspended.set(1)
			if (n.get ne null) trySchedule() //若还有再需要处理的消息
		}
	}

	//批量处理消息
	//返回最后一个被处理消息的节点
	@tailrec
	private def batchHandle(t: Node[A], i: Int): Node[A] = {
		val n = t.get
		if (n ne null) {
			try {
				handler(n.a) //调用户消息处理函数
			} catch {
				case ex: Throwable => onError(ex)
			}
			if (i > 0) batchHandle(n, i - 1) else n
		} else t
	}
}

object Actor {

	/** Create an `Actor` backed by the given `ExecutorService`. */
	def apply[A](es: ExecutorService)(handler: A => Unit, onError: Throwable => Unit = throw _): Actor[A] =
		new Actor(Strategy.fromExecutorService(es))(handler, onError)
}

private class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]]

trait Strategy {
	def apply[A](a: => A): () => A
}

//策略
object Strategy {

	/**
	  * We can create a `Strategy` from any `ExecutorService`.
	  * It's a little more convenient than submitting `Callable` objects directly.
	  * 创建了一个策略. 他提交Callable对象
	  */
	def fromExecutorService(es: ExecutorService): Strategy = new Strategy {
		def apply[A](a: => A): () => A = {
			val f = es.submit { new Callable[A] { def call: A = a} }
			() => f.get
		}
	}

	/**
	  * A `Strategy` which begins executing its argument immediately in the calling thread.
	  */
	def sequential: Strategy = new Strategy {
		def apply[A](a: => A): () => A = {
			val r = a
			() => r
		}
	}
}

object ActorTest extends App{
	implicit val es:ExecutorService = Executors.newFixedThreadPool(1)

	val a = Actor[String](es){
		msg => println(msg)
	}

	a ! "123"
	a ! "456"

	es.shutdown()
}

(3)非阻塞式的并行计算

package cn.rectcircle.scala.fpbook.chapter07

import java.util.concurrent.{Callable, CountDownLatch, ExecutorService, Executors}
import java.util.concurrent.atomic.AtomicReference

object Nonblocking {

	//使用自定义的Future代替java的Future
	sealed trait Future[A] {
		//这是一个副作用的函数,但是他仅仅是局部可见的,
		// 不会影响函数式API的纯度
		private[chapter07] def apply(k: A => Unit): Unit
	}
	type Par[A] = ExecutorService => Future[A]

	//永远阻塞线程等待完成
	def run[A](p:Par[A])(implicit es:ExecutorService):A = {
		//用于存放线程安全的引用
		val ref = new AtomicReference[A]
		//当调用latch.await()当前线程对阻塞,
		//直到latch.countDown()的次数达到1次
		val latch = new CountDownLatch(1)
		//执行Par
		p(es) { a =>
			ref.set(a)
			latch.countDown()
		}
		latch.await()
		ref.get()
	}

	//直接在主线程调用run的时候执行
	def unit[A](a: A): Par[A] = _ =>
		new Future[A] {
			override def apply(cb: A => Unit): Unit =
				cb(a)
		}


	def eval(es: ExecutorService)(r: => Unit):Unit =
		es.submit(new Callable[Unit] {
			override def call(): Unit = r
		})
	def fork[A](a: => Par[A]):Par[A] =
		es => new Future[A] {
			override def apply(cb: A => Unit): Unit =
				eval(es)(a(es)(cb))
		}

	def map2[A,B,C](p: Par[A], p2: Par[B])(f: (A,B) => C): Par[C] =
		es => new Future[C] {
			override def apply(cb: C => Unit): Unit = {
				//用于储存两个结果
				var ar: Option[A] = None
				var br: Option[B] = None
				// this implementation is a little too liberal in forking of threads -
				// it forks a new logical thread for the actor and for stack-safety,
				// forks evaluation of the callback `cb`
				//a的结果先到达,等待b
				//b的结果先到达,等待a
				val combiner = Actor[Either[A,B]](es) {
					case Left(a) =>
						if (br.isDefined) eval(es)(cb(f(a,br.get)))
						else ar = Some(a)
					case Right(b) =>
						if (ar.isDefined) eval(es)(cb(f(ar.get,b)))
						else br = Some(b)
				}
				p(es)(a => combiner ! Left(a))
				p2(es)(b => combiner ! Right(b))
			}
		}

	// specialized version of `map`
	def map[A,B](p: Par[A])(f: A => B): Par[B] =
		es => new Future[B] {
			def apply(cb: B => Unit): Unit =
				p(es)(a => eval(es) { cb(f(a)) })
		}

	//以下拷贝自Par.scala
	def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

	def asyncF[A,B](f: A => B): A => Par[B] =
		a => lazyUnit(f(a))

	def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
		as match {
			case Nil => unit(Nil)
			case h :: t => map2(h, fork(sequence(t)))(_ :: _)
		}

	def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
		if (as.isEmpty) unit(Vector())
		else if (as.length == 1) map(as.head)(a => Vector(a))
		else {
			val (l,r) = as.splitAt(as.length/2)
			map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
		}
	}

	def sequence[A](as: List[Par[A]]): Par[List[A]] =
		map(sequenceBalanced(as.toIndexedSeq))(_.toList)

//	def sequence[A](ps:List[Par[A]]):Par[List[A]] =
//		ps.foldRight(unit(List[A]()))((p, acc)=> map2(p, acc)(_::_))

	//将整个列表并行化
	def parMap[A,B](as: List[A])(f: A => B): Par[List[B]] =
		sequence(as.map(asyncF(f)))

	def parMap_1[A,B](as: IndexedSeq[A])(f: A => B): Par[IndexedSeq[B]] =
		sequenceBalanced(as.map(asyncF(f)))


	def main(args: Array[String]): Unit = {
		//测试
		implicit val es:ExecutorService = Executors.newFixedThreadPool(2)

		def p = unit{
			println(Thread.currentThread().getName)
			1
		}
		println("===test unit run")
		println(run(p))

		println("===test fork")
		println(run(fork(p)))
		println("after test fork===")

		val p1 = parMap(List.range(1, 100))(a =>{
			println(Thread.currentThread().getName)
			math.sqrt(a)
		})
		println(run(p1))
		es.shutdown()
	}
}

8、基于性质的测试

package cn.rectcircle.scala.fpbook.chapter08

import cn.rectcircle.scala.fpbook.chapter05.Stream
import cn.rectcircle.scala.fpbook.chapter06.{RNG, State}
import cn.rectcircle.scala.fpbook.chapter08.Gen.choose
import cn.rectcircle.scala.fpbook.chapter08.Prop._


//trait Prop {
//	def check:Either[(FailedCase, SuccessCount), SuccessCount]
//	//Exercise 8.3
////	def &&(p:Prop): Prop = Prop(check && p.check)
//}

case class Prop(run:(TestCases, RNG) => Result){

	//Exercise 8.9
	def &&(p:Prop):Prop = Prop { (n, rng) =>
		this.run(n, rng) match {
			case Passed => p.run(n, rng)
			case x => x
		}
	}
	def ||(p:Prop):Prop = Prop { (n, rng) =>
		this.run(n, rng) match {
			case Falsified(msg, _) => p.run(n,rng) match {
				case Falsified(e, c) => Falsified(msg + "\n" + e, c)
				case x => x

			}
			case x => x
		}
	}


}


object Prop {
	type FailedCase = String
	type SuccessCount = Int
	type TestCases = Int
	//记录测试结果的类型
	sealed trait Result {
		def isFalsified:Boolean
	}
	case object Passed extends Result{
		override def isFalsified: Boolean = false
	}
	case class Falsified(failure:FailedCase, successes:SuccessCount) extends Result{
		override def isFalsified: Boolean = true

	}

	def randomStream[A](g: Gen[A])(rng: RNG): Stream[A] =
		Stream.unfold(rng)(rng => Some(g.sample.run(rng)))

	def forAll[A](as: Gen[A])(f: A => Boolean): Prop = Prop {
		(n,rng) =>
			randomStream(as)(rng).zip(Stream.from(0)).take(n).map {
				//在(a,i)中a是随机值,i是stream的下标
				//当测试失败时记录失败的用例和成功的个数
				case (a, i) => try {
					if (f(a)) Passed else Falsified(a.toString, i)
					//测试失败时记录在结果中
				} catch { case e: Exception => Falsified(buildMsg(a, e), i) }
			}.find(_.isFalsified).getOrElse(Passed)

	}
	// String interpolation syntax. A string starting with `s"` can refer to
	// a Scala value `v` as `$v` or `${v}` in the string.
	// This will be expanded to `v.toString` by the Scala compiler.
	def buildMsg[A](s: A, e: Exception): String =
		s"test case: $s\n" +
		  s"generated an exception: ${e.getMessage}\n" +
		  s"stack trace:\n ${e.getStackTrace.mkString("\n")}"


	def main(args: Array[String]): Unit = {
		val intList = Gen.listOfN(5, choose(0, 100))
		val prop = forAll(intList)(ns => ns.reverse.reverse == ns)
		println(prop.run(5, RNG.SimpleRNG(1)))
	}
}

case class Gen[A](sample:State[RNG, A]){
	def map[B](f: A => B): Gen[B] =
		Gen(sample.map(f))

	//Exercise 8.6
	def flatMap[B](f:A=>Gen[B]):Gen[B] =
		Gen(sample.flatMap(a=> f(a).sample))

	def listOfN(size:Int):Gen[List[A]] =
		Gen.listOfN(size, this)

	def listOfN(size: Gen[Int]): Gen[List[A]] =
		size flatMap (n => this.listOfN(n))



}

object Gen{
	//Exercise 8.4
	def choose(start:Int, stopExclusive:Int):Gen[Int] =
		Gen(State(RNG.nonNegativeInt))
			.map(n => start + n % (stopExclusive-start))

	//Exercise 8.5
	def unit[A](a: =>A):Gen[A] = Gen(State.unit(a))
	def boolean:Gen[Boolean] = Gen(State(RNG.boolean))
	def listOfN[A](n:Int, g: Gen[A]):Gen[List[A]] =
		Gen(State.sequence(List.fill(n)(g.sample)))

	//Exercise 8.7
	def union[A](g1:Gen[A], g2:Gen[A]):Gen[A] =
		boolean.flatMap(if(_) g1 else g2)

	//Exercise 8.8
	def weighted[A](g1: (Gen[A],Double), g2: (Gen[A],Double)): Gen[A] = {
		/* The probability we should pull from `g1`. */
		val g1Threshold = g1._2.abs / (g1._2.abs + g2._2.abs)

		Gen(State(RNG.double).flatMap(d => if (d < g1Threshold) g1._1.sample else g2._1.sample))
	}


	def main(args: Array[String]): Unit = {
		val intList = Gen.listOfN(5, choose(0, 100))
		println(intList)
	}
}