在项目中看到一段代码,大意如下:
import scala.concurrent._
import ExecutionContext.Implicits.global
object QueueProcessor {
def listen() {
while(true) {
val msg = queue.next()
processMessage(msg) match {
case Success(_) =>
case _ =>
}
}
}
}
object Main extends App {
Future {
QueueProcessor.listen()
}
// do other things
}
可以看到,QueueProcessor.listen()
是一个无限循环的任务,我们在Future {}
调用它,它将会在另一个线程中执行。
我觉得这种做法不对,原因如下:
Future
通常是用来做延时计算的。当我们使用Future
时,通常会预期该任务将在未来某个时间点返回一个值。把一个无限循环的任务放进去让人感觉有点意外。
ExecutionContext.Implicits.global
默认提供的线程池中的线程是有限的(默认等于cpu核数),长期占用会对其它任务产生影响甚至导致它们无法执行。
比如,我们的程序部署在一个cpu为一个核的虚拟机上,上面的代码将导致Future中其它的任务没有机会执行,所以我在项目中同时发现了以下代码:
System.setProperty("scala.concurrent.context.numThreads", "10")
System.setProperty("scala.concurrent.context.maxThreads", "10")
它用来调整ExecutionContext.Implicits.global
线程值的大小。
import scala.concurrent._
import ExecutionContext.Implicits.global
println("Your cpu cores: " + Runtime.getRuntime.availableProcessors)
0 to 10 foreach { i =>
Future {
while(true) {
println(i)
Thread.sleep(10000);
}
}
}
在我的电脑上运行如下:
You cpu cores: 8
0
1
2
3
4
5
6
7
可见前8个任务把Future的池全占满了,后面的任务就没有办法执行了。
最后还是改成java中的传统写法:
object Main extends App {
new Thread(new Runnable() {
override def run: Unit = QueueProcessor.listen()
}).start()
// do other things
}