多线程并发问题还是很常见的。有的时候总觉得,我只用一个静态的序列对象就不会出现并发问题了。但是实际一测,发现原来不然。
下面写了一个简单的测试程序,使用多线程并发计数和多线程消费计数。程序使用Scala语言写的,但是跟Java代码基本是一样的,可以放心阅读。
类Queue使用了一个队列对象,主要用于按顺序写入数据和读出数据。
类Producer用于往队列里写入数据。
类Consumer用于从队列里读出数据。
通过两种情况判断并发是否真的发生了:
- 通过Producer并发产出已知数量的消息,检验最终队列里消息数量是否一致。
- 记录Consumer消费的消息数量,和队列里的消息总数进行对比,看是否一致。
先分析一下,如果并发不会发生的话,Queue队列里消息总数、Consumer消费的消息总数,都应该等于Producer产出的消息总数。而如果并发发生了,Queue队列里的消息总数应该小于等于Producer产出消息总数,而Consumer消费的消息总数应该大于Queue队列里的消息总数。
下面就上代码,验证一下。
程序入口类Test:
package com.test /** * Test * Created by wanbo on 15/8/25. */ object Test { def main(args: Array[String]) { println("Start up ...") println("Produce >>>") // 使用5个线程生成消息,每个线程生产10W for(i <- Range(0, 5)) new Thread(new Producer).start() Thread.sleep(10000) println(Queue.getSize) Thread.sleep(10000) println(Queue.getSize) println("Consume >>>") for(i <- Range(0, 5)) new Thread(new Consumer).start() Thread.sleep(10000) println("Consumed size:" + ZK.getSize) println("The final size of queue:") println(Queue.getSize) println("All have done!") } }
Queue类:
package com.test import scala.collection.mutable /** * Queue * Created by wanbo on 15/9/15. */ object Queue { private val queue = mutable.Queue[Int]() /** * 入队列一个数,仅作为计数用 */ def add(): Unit ={ queue += 1 } /** * 出队列一个数,仅作为计数用 */ def pop(): Int ={ queue.dequeue() } def getSize: Int ={ queue.size } }
Producer类:
package com.test /** * Producer * Created by wanbo on 15/9/15. */ class Producer extends Runnable { override def run(): Unit = { for(i <- Range(0, 100000)) Queue.add() } }
Consumer类:
package com.test /** * Consumer * Created by wanbo on 15/9/15. */ class Consumer extends Runnable { override def run(): Unit = { // 消费掉所有消息 while (Queue.getSize > 0){ Queue.pop() ZK.add() // ZK这个类是用来计数的,记录消费消息数量 } } }
ZK类也贴出来吧,可以先不管这个类,只知道它能完整计数就可以了。
package com.test import scala.collection.mutable /** * Queue * Created by wanbo on 15/9/15. */ object ZK { private val queue = mutable.Queue[Int]() def add(): Unit ={ queue.synchronized{ queue += 1 } } def getSize: Int ={ queue.size } }
跑一个结果看下:
Start up ...
Produce >>>
216254
216254
Consume >>>
Consumed size:217135
The final size of queue:
0
All have done!
从结果中看到,Producer只生产了216245条消息,而理论上应该产出50W条消息。Consumer消费了217135条消息,而队列Queue中只有216245条。
这个结论跟前面推测的一样,说明并发问题真的发生了。
怎么解决呢,这里有一篇文章,个人觉得很不错分享一下:https://twitter.github.io/scala_school/concurrency.html
我也是看了这篇文章之后,决定选择synchronized解决这个问题。
Queue类,改成如下:
package com.test import scala.collection.mutable /** * Queue * Created by wanbo on 15/9/15. */ object Queue { private val queue = mutable.Queue[Int]() /** * 入队列一个数,仅作为计数用 */ def add(): Unit ={ queue.synchronized { queue += 1 } } /** * 出队列一个数,仅作为计数用 */ def pop(): Int ={ queue.synchronized { queue.dequeue() } } def getSize: Int ={ queue.size } }
再跑下结果:
Start up ...
Produce >>>
500000
500000
Consume >>>
Consumed size:500000
The final size of queue:
0
All have done!
这回看上去,就正常了。
(责任编辑:安博涛)