我找不到适合Ruby的不错的ThreadPool实现,所以我写了我的(部分基于此处的代码:http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/ posts / show / 3276,但更改为等待/信号和其他实现ThreadPool关闭的实现,但是经过一段时间的运行(拥有100个线程并处理了大约1300个任务),它死于第25行的死锁-等待新工作 有什么想法,为什么会发生?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts"Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError,"Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end |
好的,因此实现的主要问题是:如何确保没有信号丢失并避免死锁?
以我的经验,使用条件变量和互斥锁确实很难做到这一点,但使用信号量则很容易。碰巧红宝石实现了一个名为Queue(或SizedQueue)的对象,该对象应该可以解决问题。这是我建议的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts"Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError,"Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end |
这是一个简单的测试代码:
1 2 3 4
| tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print"Computation #{i} done. Nb of tasks: #{tp.size}\
" } }
tp.shutdown |
您可以尝试work_queue gem,它用于协调生产者和工作线程池之间的工作。
多年来,顶级评论者的代码提供了很多帮助。在这里,它针对ruby 2.x进行了更新,并通过线程识别进行了改进。那有什么改善?当每个线程都有一个ID时,可以用存储任意信息的数组组成ThreadPool。一些想法:
-
无数组:典型ThreadPool用法。即使使用GIL,它也可以使死代码易于编码,并且对于高延迟应用程序(例如,大量Web爬网,
-
ThreadPool和Array的大小取决于CPU的数量:易于派生进程以使用所有CPU,
-
ThreadPool和Array的大小取决于资源的数量:例如,每个数组元素代表一个实例池中的一个处理器,因此,如果您有10个实例,每个实例具有4个CPU,则TP可以管理40个子进程的工作。
对于这最后两个,不要考虑线程在做工作,而要考虑ThreadPool管理正在工作的子流程。管理任务是轻量级的,并且与关心GIL的子流程结合在一起时。
使用此类,您可以用大约一百行代码来编写基于集群的MapReduce!这段代码很简短,虽然有点让人费解。希望能帮助到你。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| # Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError,"Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end |
好的,问题似乎出在您的ThreadPool#signal方法中。可能发生的是:
1-您所有的工人都很忙,您尝试处理新工作
2-第90行得到一名零工人
3-工人被释放并发出信号,但由于ThreadPool不等待信号而丢失了信号
4-您落在第95行,即使有空闲工人,也要等待。
这里的错误是即使没有人在听,您也可以向自由工作者发出信号。此ThreadPool#signal方法应为:
1 2 3
| def signal
@mutex.synchronize { @cv.signal }
end |
这个问题在Worker对象中也是一样。可能发生的情况是:
1-工人刚刚完成工作
2-检查(第17行)是否有等待的工作:没有
3-线程池发送一个新作业并发出信号...但是信号丢失
4-即使信号被标记为忙,工作人员仍在等待信号
您应该将您的initialize方法放置为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end |
接下来,Worker#get_block和Worker#reset_block方法不应再同步。这样,您就无法在块测试和信号等待之间将块分配给工作人员。
我在这里有些偏见,但是我建议您使用某种流程语言对此模型进行建模并对其进行模型检查。免费提供的工具包括,例如mCRL2工具集(使用基于ACP的语言),移动工作台(pi演算)和Spin(PROMELA)。
否则,我建议删除对问题不重要的所有代码,并找出发生死锁的最小情况。我怀疑100个线程和1300个任务对于陷入僵局是否必不可少。在较小的情况下,您可能只需添加一些调试打印即可提供足够的信息来解决问题。