一个Java ThreadPool的问题

n
nowwhat2012
楼主 (未名空间)

现有一个计算量比较重的Java任务。在一个8 core的计算机上,为了加快这个程序的运行,用ThreadPool创建了9个Java线程;每个线程/Donkey做一小部分计算。(程序片段如下,Donkey是子线程。)

long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool (9);
for (int i=0; i < 9; i++) {
Donkey oneDonkey = …;
executor.execute (oneDonkey);
}

executor.shutdown();
while (!executor.isTerminated()) {}
long endTime = System.currentTimeMillis();

为了跟踪线程的计算速度,在每个Donkey线程里面测量了一下ThreadCpuTime (用
ThreadMXBean.getThreadCpuTime(thr.getId()测的) 和 wall clock time (用System.nanoTime()测两次, 然后计算差)。 发现这两个时间差别很大, 有4倍的差别。(
每个线程的wall clock time和上面程序里面的(endTime-startTime)差不多。)

感觉这些个线程并没有被分配到不同CPU core 上做并行计算。

上面的这种ThreadPool的用法有上面错误吗?

谢谢!

f
fantasist

先跟单线程跑的时间比较一下?

s
sanwadie

Parallelism 和 Concurrency 是有区别的。

你的ThreadPool 大于 cpu core (我假设是logic core,不能再分了),必然会有不
同的thread 分享同一个core。而且,系统上面不止一个Java Process,同时有其他的
Thread,所以你的这个结果是必然的。
n
nowwhat2012

单线程的状态下跑过: 耗时 0.935 seconds

然后用9个线程(机器有8个 logic cores)合作做同样的计算(每个线程做一部分),每个线程的 CPU time
Thread pool-2-thread-9: 0.308784606 seconds
Thread pool-2-thread-8: 0.303416485 seconds
Thread pool-2-thread-7: 0.282876124 seconds
Thread pool-2-thread-2: 0.310617455 seconds
Thread pool-2-thread-1: 0.312345899 seconds
Thread pool-2-thread-6: 0.306104829 seconds
Thread pool-2-thread-5: 0.278591632 seconds
Thread pool-2-thread-4: 0.292591922 seconds
Thread pool-2-thread-3: 0.290900374 seconds

但是等所有的线程返回的时候,wall clock 耗时: 1.33 seconds (比单线程还慢)。

(很奇怪的是在每一个线程里面测试wall clock 的时候, 耗时也要 1.x seconds.
不是很清楚为什么在每一个线程里面的 cpu time 和 wall clock time 有这么大的差
异。)

【 在 fantasist (一) 的大作中提到: 】
: 先跟单线程跑的时间比较一下?

n
nowwhat2012

机器的CPU有八个logic cores.

在Java里面, 我也试过用两个,三个,四个, 五个, 六个线程的情形。 结果差不多: 如果比较wall clock time的话,多线程反而都比单线程慢。但是去单独量每个线程的CPU time的时候,那个时间又确实是小了很多。

所以我怀疑我的程序弄错了什么。

(别人的基于C/C++的程序,做同样的计算,在运用multi core的时候, 确实是快了很多【在这个同样的8 core 的机器上比不用multi core的时候快了4倍】。)

【 在 sanwadie (三娃爹) 的大作中提到: 】
: Parallelism 和 Concurrency 是有区别的。
: 你的ThreadPool 大于 cpu core (我假设是logic core,不能再分了),必然会有不
: 同的thread 分享同一个core。而且,系统上面不止一个Java Process,同时有其他的
: Thread,所以你的这个结果是必然的。

p
pptwo

别人能做出来你不能,不用怀疑肯定有问题,贴程序吧。

【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 机器的CPU有八个logic cores.
: 在Java里面, 我也试过用两个,三个,四个, 五个, 六个线程的情形。 结果差不多
: : 如果比较wall clock time的话,多线程反而都比单线程慢。但是去单独量每个线程
: 的CPU time的时候,那个时间又确实是小了很多。
: 所以我怀疑我的程序弄错了什么。
: (别人的基于C/C++的程序,做同样的计算,在运用multi core的时候, 确实是快了很
: 多【在这个同样的8 core 的机器上比不用multi core的时候快了4倍】。)

s
szcooutt

这个数据是正确的啊。你9个线程,假设都是run concurrently,也就是你花0.
312345899 (花最多时间那个线程决定了你的total runtime) 就完成了 VS 你单线程花0.935.

除非你的线程不是 run concurrently

【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 单线程的状态下跑过: 耗时 0.935 seconds
: 然后用9个线程(机器有8个 logic cores)合作做同样的计算(每个线程做一部分),每
: 个线程的 CPU time
: Thread pool-2-thread-9: 0.308784606 seconds
: Thread pool-2-thread-8: 0.303416485 seconds
: Thread pool-2-thread-7: 0.282876124 seconds
: Thread pool-2-thread-2: 0.310617455 seconds
: Thread pool-2-thread-1: 0.312345899 seconds
: Thread pool-2-thread-6: 0.306104829 seconds
: Thread pool-2-thread-5: 0.278591632 seconds
: ...................

w
wdong

我不熟悉java,但我猜你的问题出在了executor.shutdown这一行。如果你
实在没有办法了可以试试下面的改法。

executor.execute应该返回的是一个future. 你把9个future存在一个数组里。
然后原来executor.shutdown那行换成一个循环挨个get这些future。
完了以后直接shutdownNow。

你对thread pool executor的理解有点不对。一般来说,如果你的thread
pool里有9个线程,你的donkey个数应该远远大于9个。你这个9是通过你有
8个cpu core得出来的,和你的问题无关。你问题本身应该还有一个可以分割
的程度,这个数字如果也是9,那就太巧合了。

【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 现有一个计算量比较重的Java任务。在一个8 core的计算机上,为了加快这个程序的运
: 行,用ThreadPool创建了9个Java线程;每个线程/Donkey做一小部分计算。(程序片段
: 如下,Donkey是子线程。)
: long startTime = System.currentTimeMillis();
: ExecutorService executor = Executors.newFixedThreadPool (9);
: for (int i=0; i < 9; i++) {
: Donkey oneDonkey = …;
: executor.execute (oneDonkey);
: }
: executor.shutdown();
: ...................

n
nowwhat2012

别人的程序是C/C++写的(那个是一个很成熟的库)。

现在我的程序是Java的(选择Java有别的原因)。我更倾向认为我的这个问题不应当是Java的原因。

【 在 pptwo (pp) 的大作中提到: 】
: 别人能做出来你不能,不用怀疑肯定有问题,贴程序吧。

n
nowwhat2012

这个是我没有解释清楚。

再来。

假设这个大任务是A。

(1) 在Java里面,用单线程来完成A,时间是0.935 seconds

(2) 在Java里面,用多线程 ThreadPool 9个线程来完成(computer has 8 logic
cores), 每个线程完成 1/9*A 的任务。在每个线程里面(完成任务后,返回前)统计 cpu Time, 大约是 0.308784606 seconds。 这个大体合理。

在每个线程里面(完成任务后,返回前)统计 wall clock time, 大约是 1.x seconds。 这个不合理。也不知道为什么。

在所有9个线程返回后,统计wall clock time, 大约是 1.33 seconds。这个既合理,
也不合理。合理的地方是既然每个线程的wall clock time已经是 1.x seconds, 最后
的时间是1.3 也没有什么。不合理的地方是每个线程的cpu Time是0.308784606
seconds; 如果他们是并行(on different cores)的, 那么最后的时间应当是0.4x
seconds.如果是这个数据, 我也认了。

等会, 我把完整的code 清理一下, 再贴出来

【 在 szcooutt (szcooutt) 的大作中提到: 】
: 这个数据是正确的啊。你9个线程,假设都是run concurrently,也就是你花0.
: 312345899 (花最多时间那个线程决定了你的total runtime) 就完成了 VS 你单线程
: 花0.935.
: 除非你的线程不是 run concurrently

n
nowwhat2012

谢谢。 我待会去试试。

》 如果你的thread pool里有9个线程,你的donkey个数应该远远大于9个。

我也试过16和32线程。结果类似。

我再试试。 然后把更完整的code清一清再贴出来。

【 在 wdong (万事休) 的大作中提到: 】
: 我不熟悉java,但我猜你的问题出在了executor.shutdown这一行。如果你
: 实在没有办法了可以试试下面的改法。
: executor.execute应该返回的是一个future. 你把9个future存在一个数组里。
: 然后原来executor.shutdown那行换成一个循环挨个get这些future。
: 完了以后直接shutdownNow。
: 你对thread pool executor的理解有点不对。一般来说,如果你的thread
: pool里有9个线程,你的donkey个数应该远远大于9个。你这个9是通过你有
: 8个cpu core得出来的,和你的问题无关。你问题本身应该还有一个可以分割
: 的程度,这个数字如果也是9,那就太巧合了。

s
sanwadie

我感觉代码没问题,但你的测试和对thredpool的理解有些偏差。建议这样测试:

fixed threadpool 设置为 n = logical core - 1,
线程数需要远大于 n,但小于 maxpool#,否则会报错,然后运行并和单线程比较一下
。单线程运行时间最好在60秒以上,这样可以排除一下代码那些无谓的sleep时间的影响

我记忆里使用thread pool 高并发的效果是很直接的。
a
americastone

CpuTime 应该是thread non-wait state elapses time. 如果你的thread 有处于过
wait state,那么cpuTime就会小于你所谓的wall clock time. 你的runable or
callable 有shared object吗?有可能有lock 或wait set情况
b
baskervilles

既然可以重现,就不用猜,用VisualVM随便看看thread view里到底thread怎么跑的,
或者每个thread print start & end nano
a
americastone

如果你的job里面有sleep大部分测试code会用,那么sleep不会浪费cpu cycle(cpu
time 不会被计算)
n
nowwhat2012

这个附件里是线程程序

n
nowwhat2012

这个附件里是创建ThreadPool的程序

n
nowwhat2012

这个附件是DonkeyManager运行在8-core的计算机上的结果(除了这个程序,机器CU 99.9% idle)。

Donkey线程里面没有shared objects。 但是它的线程Wall clock time 总是cpu time 的大约三倍多。

【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 这个附件里是创建ThreadPool的程序

c
catfishing

这个问题应该用多进程(multi processing) 在多核上运行,可以提高速度!
而不是用多线程!区别大大的........................

a
americastone

可能这个不会影响最终结果,不过你用shutdown 和 isTerminated 去确保job完成感觉有点难过,excute return future 你可以把future 存起来用loop futures 并且get() block直到完成
p
pptwo

你把关键部分都省略了这能看出个啥 ...
【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 这个附件里是线程程序

p
pptwo

你以为是在写python跟javascript吗
【 在 catfishing (catfish) 的大作中提到: 】
: 这个问题应该用多进程(multi processing) 在多核上运行,可以提高速度!
: 而不是用多线程!区别大大的........................

w
walkrandom

没加锁
data racing


听楼上一句话
还是多进程吧
s
sanwadie

我觉得你的测试只能说明,你的Java thread没有获得最大并行,这个完全可能,但并
不说明线程没起作用。

每个thread实际使用cpu 0.3 秒,9 个 线程如果没有任何并行,是2.7 秒
每个thread的wallclock 1 秒左右,如果没有任何并行,是 9 秒
请测试一下,在使用线程的情况下,整个程序从开始到全部结束,使用了多少时间。

g
guvest

我也觉得是这个原因。

【 在 walkrandom(walkrandom) 的大作中提到: 】

: 没加锁

: data racing

: 惨

: 听楼上一句话

: 还是多进程吧

n
nowwhat2012

谢谢楼上的各位。

应当是找到原因了: 就像“americastone (美国石头圪旦)”说的那样, 问题是出在
了那个Donkey里面的那行heavy lifting的代码部分,那部分调用了JRE/JDK带的AES实
现;结果发现那个实现不是很支持并行计算(那个实现计算正确, 但是不能充分利用
并行,不知道JRE/JDK为什么那么干)。

找到了另外一个纯Java的实现。现在Donkey线程里面的cpu time和wall clock time比
较一致了。在那台8-core的机器上用9个线程,整体速度可以提高两倍多。 这个提速还是赶不上那个c/c++库的提速3倍的效果,但是也比较接近了。

不知道用多进程会不会效果好些。暂时是不会试那个的了。

【 在 americastone (美国石头圪旦) 的大作中提到: 】
: CpuTime 应该是thread non-wait state elapses time. 如果你的thread 有处于过
: wait state,那么cpuTime就会小于你所谓的wall clock time. 你的runable or
: callable 有shared object吗?有可能有lock 或wait set情况

w
walkrandom

20个process一波带走
计算用啥多线程
x
xiaoju

楼主用java跑计算密集任务?

【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 现有一个计算量比较重的Java任务。在一个8 core的计算机上,为了加快这个程序的运
: 行,用ThreadPool创建了9个Java线程;每个线程/Donkey做一小部分计算。(程序片段
: 如下,Donkey是子线程。)
: long startTime = System.currentTimeMillis();
: ExecutorService executor = Executors.newFixedThreadPool (9);
: for (int i=0; i < 9; i++) {
: Donkey oneDonkey = …;
: executor.execute (oneDonkey);
: }
: executor.shutdown();
: ...................

m
magagop

搞笑,Java多進程,你是嫌JVM太小嗎?

【 在 walkrandom(walkrandom) 的大作中提到: 】

: 20个process一波带走

: 计算用啥多线程

m
magagop

Java怎麼多進程?起一大堆JVM只能更慢。這不是JavaScript。

【 在 guvest(我爱你老婆Anna) 的大作中提到: 】

: 我也觉得是这个原因。

k
kkkuai

可以有的,CPU-bound的话。

import java.util.concurrent.Executors;



public class Test {

int id, threads, counter;



public Test(int id, int threads, int counter) {

this.id = id;

this.threads = threads;

this.counter = counter;

}



public void test() {

var es = Executors.newFixedThreadPool(threads);

for (int i = 0; i < threads; ++ i) {

es.execute(()->{

long start = System.nanoTime();

for (int j = 0; j < counter; ++ j)

for (long k = 0; k < Integer.MAX_VALUE; ++ k);

long end = System.nanoTime();

System.out.println(id + " " + (end - start));

});

}

es.shutdown();

}



public static void main(String[] args) {

int cores = Runtime.getRuntime().availableProcessors();

Test t = new Test(1, 1, 1);

t.test();

t = new Test(2, 1, cores);

t.test();

t = new Test(3, cores, 1);

t.test();

}

}

【 在 nowwhat2012 (Judgment  day) 的大作中提到: 】
: 现有一个计算量比较重的Java任务。在一个8 core的计算机上,为了加快这个程序的运
: 行,用ThreadPool创建了9个Java线程;每个线程/Donkey做一小部分计算。(程序片段
: 如下,Donkey是子线程。)
: long startTime = System.currentTimeMillis();
: ExecutorService executor = Executors.newFixedThreadPool (9);
: for (int i=0; i < 9; i++) {
: Donkey oneDonkey = …;
: executor.execute (oneDonkey);
: }
: executor.shutdown();
: ...................

m
magagop

這不是多進程,多進程是fork runtime,搞多個JVM,然後所有Class和Heap都需要重新jitted,按照JVM這種幾百兆的overhead,只能更慢。除非安全考慮,我沒見過Java為
了性能搞多進程的。

多進程是low overhead語言Cpp等的專利,JS也可以cluster,但本質上還是fork clone那一套。

【 在 kkkuai (sknow) 的大作中提到: 】
: 可以有的,CPU-bound的话。
: import java.util.concurrent.Executors;
:
:
:
: public class Test {
:
: int id, threads, counter;
:
:
: ...................

k
kkkuai

狗优化得比爪哇JIT还飞快。

package main



import (

"fmt"

"math"

"runtime"

"time"

)



func test(id int, threads int, counter int) {

for i := 0; i < threads; i++ {

go func() {

start := time.Now()

for j := 0; j < counter; j++ {

for k := uint64(0); k < math.MaxInt32; k++ {

}

}

end := time.Now()

fmt.Println(id, end.Sub(start))

}()

}

fmt.Println(id, "press enter to continue if finished.")

var s string

fmt.Scanln(&s)

}



func main() {

cores := runtime.NumCPU()

test(1, 1, 1)

test(2, 1, cores)

test(3, cores, 1)

}

【 在 kkkuai (sknow) 的大作中提到: 】
: 可以有的,CPU-bound的话。
: import java.util.concurrent.Executors;
:
:
:
: public class Test {
:
: int id, threads, counter;
:
:
: ...................

p
pptwo

这种loop,稍微靠谱点的编译器都能给你优化没了。

【 在 kkkuai (sknow) 的大作中提到: 】
: 狗优化得比爪哇JIT还飞快。
: package main
: import (
: "fmt"
: "math"
: "runtime"
: "time"
: )
: func test(id int, threads int, counter int) {
: for i := 0; i < threads; i++ {
: ...................

n
netghost

这种程序的point是啥我从来是不懂的。
【 在 kkkuai (sknow) 的大作中提到: 】
: 狗优化得比爪哇JIT还飞快。
: package main
: import (
: "fmt"
: "math"
: "runtime"
: "time"
: )
: func test(id int, threads int, counter int) {
: for i := 0; i < threads; i++ {
: ...................

k
kkkuai

算个pi,java》c++(已O3)》go,可能java有个牛pow。

-------------

import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test2 {
int id, threads, counter;

public Test2(int id, int threads, int counter) {
this.id = id;
this.threads = threads;
this.counter = counter;
}

void calc() {
double pi = 0;
for (int k = 0; k < (int) 1e7; ++k) pi += Math.pow(-1, k) / (2 * k + 1);
pi *= 4;
System.out.println("pi=" + pi);
}

public void test() throws Exception {
var es = Executors.newFixedThreadPool(threads);
var futures = new Future<?>[threads];
for (int i = 0; i < threads; ++i)
futures[i] =
es.submit(
() -> {
long start = System.nanoTime();
for (int j = 0; j < counter; ++j) calc();
long end = System.nanoTime();
System.out.println(id + " " + (end - start));
});
for (int i = 0; i < threads; ++i) futures[i].get();
es.shutdown();
}

public static void main(String[] args) throws Exception {
int cores = Runtime.getRuntime().availableProcessors();
Test2 t = new Test2(1, 1, 1);
t.test();
t = new Test2(2, 1, cores);
t.test();
t = new Test2(3, cores, 1);
t.test();
}
}

------------

#include
#include
#include
#include
#include
using namespace std;

void calc() {
double pi = 0;
for (int k = 0; k < (int)1e7; ++k)
pi += pow(-1, k) / (2 * k + 1);
pi *= 4;
cout << "pi=" << pi << endl;
}

void test(int id, int threads, int counter) {
thread ts[threads];
for (int i = 0; i < threads; ++i)
ts[i] = thread(
[](int id, int threads, int counter) {
auto start = std::chrono::high_resolution_clock::now();
for (int j = 0; j < counter; ++j)
calc();
auto end = std::chrono::high_resolution_clock::now();
cout << id << ' ' << (end - start).count() << endl;
},
id, threads, counter);
for (int i = 0; i < threads; ++i)
ts[i].join();
}

int main() {
int cores = thread::hardware_concurrency();
test(1, 1, 1);
test(2, 1, cores);
test(3, cores, 1);
}

------------

package main

import (
"fmt"
"math"
"runtime"
"sync"
"time"
)

func calc() {
pi := float64(0)
for k := 0; k < 1e7; k++ {
pi += math.Pow(-1, float64(k)) / (2*float64(k) + 1)
}
pi *= 4
fmt.Println("pi", pi)
}

func Test(id int, threads int, counter int) {
var wg sync.WaitGroup
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
start := time.Now()
for j := 0; j < counter; j++ {
calc()
}
end := time.Now()
fmt.Println(id, end.Sub(start))
defer wg.Done()
}()
}
wg.Wait()
}

func main() {
cores := runtime.NumCPU()
Test(1, 1, 1)
Test(2, 1, cores)
Test(3, cores, 1)
}

【 在 netghost (Up to Isomorphism) 的大作中提到: 】
: 这种程序的point是啥我从来是不懂的。
:
:
:
:
:
:
:
:
:

k
kkkuai

scala=java≈clj跑起来,jvm上的compilers可以的。

----

object HelloWorld extends App {
val calc = (id :Int, concurrent :Boolean) => {
val start = System.nanoTime
var pi = 0d
for ( k <- 0="" to="" 1e7.toInt="" pi="Math.pow(-1," k="" 2="" k="" 1=""> pi *= 4
val end = System.nanoTime
println(f"id=$id%d pi=$pi%6f concurrent=$concurrent%5b time=${end-
start}%10d")
}

val test = (id :Int) => calc(id, false)

val testConcurrent =(id: Int) => {
val cores = Runtime.getRuntime.availableProcessors;
for (i <- 1="" to="" cores="" new="" Thread=""> calc(id, true)).start
}

test(1);
testConcurrent(2);
}

----

(defn calc [id concurrent]
(let [start (System/nanoTime)
pi (* 4 (loop [k 0 res 0.]
(if (< k (int 1e7))
(recur (inc k) (+ res (/ (Math/pow -1 k) (inc (* 2 k)))))
res)))
end (System/nanoTime)]
(.printf (System/out)
"id=%d pi=%6f concurrent=%5b time=%10dn"
(into-array Object [id pi concurrent (- end start)]))))

(defn test' [id] (calc id false))

(defn test-concurrent [id]
(let [cores (. (Runtime/getRuntime) availableProcessors)]
(dotimes [_ cores] (.start (Thread. #(calc id true))))))

(test' 1)
(test-concurrent 2)

【 在 kkkuai (sknow) 的大作中提到: 】
: 算个pi,java》c++(已O3)》go,可能java有个牛pow。
: -------------
: import java.util.concurrent.Executors;
: import java.util.concurrent.Future;
: public class Test2 {
: int id, threads, counter;
: public Test2(int id, int threads, int counter) {
: this.id = id;
: this.threads = threads;
: this.counter = counter;
: ...................