Ruby多线程库(Thread)使用方法详解

Ruby多线程库(Thread)使用方法详解

Thread是Ruby的线程库,Thread库已经内置在Ruby中,但如果想要使用线程安全的Queue、Mutex以及条件变量等,则需要手动require 'thread'

主线程main

默认情况下,每个Ruby进程都具备一个主线程main,如果没有创建新的线程,所有的代码都将在这个主线程分支中执行。

使用Thread.main()类方法可获取当前线程组的主线程,使用Thread.current()可以获取当前正在执行的线程分支。使用Thread.list()可获取当前进程组中所有存活的线程。

p Thread.main p Thread.current p Thread.main == Thread.current =begin #<Thread:0x0000000001d9ae58 run> #<Thread:0x0000000001d9ae58 run> true =end

可见,线程其实是一个Thread类的实例对象。

创建Ruby线程

使用Thread库的new()、start()、fork()可创建线程,它们几乎等价,且后两者是别名关系。

创建线程时需传递一个代码块或Proc对象参数, 它们是要执行的任务,它们将在新的线程分支中执行。如果需要,可以为代码块或Proc对象传递参数。

arr=[] a,b,C=1,2,3 Thread.new(a,b,c) { |d,e,f| arr << d << e << f } sleep 1 p arr #=> [1,2,3]

如果主线程先执行完成,主线程将直接退出,主线程的退出将会终止进程,使得其它线程也会退出。

Thread.new {puts "hello"} puts "world"

上述代码几乎总是会输出world,然后退出,主线程的退出使得子线程不会输出"hello"。之所以总是会输出world而不是输出hello,这和Ruby的线程调度有关,在后面的文章中会详细解释Ruby中的线程调度。

join()和value()等待线程

如果想要等待某个线程先执行完成,可使用t.join(),如果线程t尚未退出,则join()会阻塞。可以在任意线程中调用t.join(),谁调用谁等待。

t = Thread.new { puts "I am Child" } t.join # 等待子线程执行完成 puts "I am Parent"

还可以将多个线程对象放进数组,然后执行遍历join,另一种常见的做法是使用map{}.each(&:join)的方式:

threads = [] 3.times do |i| # 将多个线程加入到数组中 threads << Thread.new { puts "Thread #{i}" } end # 在main线程中join每个线程, # 因此只有3个线程全都完成后,main线程才会继续,即退出 threads.each(&:join) =begin Thread 1 Thread 0 Thread 2 =end # 另一种常见方式 3.times.map {|i| Thread.new { puts "Thread #{i}" } }.each(&:join) Array.new(3) {|i| Thread.new { puts "Thread #{i}" } }.each(&:join)

t.value()t.join()类似,不同之处在于t.value()在内部调用t.join()等待线程t之后,还会在等待成功时取得该线程的返回值。

a = Thread.new { 2 + 2 } p a.value #=> 4

注意,对于Ruby来说,无论是否执行join()操作,任务执行完成的线程都会马上被操作系统回收(从OS线程表中删除),但被回收的线程仍然能够使用value()方法来获取被回收线程的返回值。之所以会这样,我个人猜想,也许是因为Ruby内部已经帮我们执行了join操作并将线程返回值保存在Ruby内部,这样对于用户来说就更加安全,而且用户执行join()或value()操作,可能是在等待Ruby内部的这个值的出现。

线程的异常处理

默认情况下,当某个非main线程中抛出异常后,该线程将因异常而终止,但是它的终止不会影响其它线程。

t = Thread.new {raise "hello"} # 抛出异常 sleep 1 # 仍然睡眠1秒后退出

如果使用了t.join()t.value()去等待抛出异常的线程t,异常将会传播给调用这两个方法的线程。例如主线程调用t.join,如果t会抛出一次异常,那么主线程在等待过程中还会抛出一次异常。

t = Thread.new {raise "hello"} # 抛出异常 t.join() # 子线程抛异常后,main线程也抛异常

如果想要让任意线程出现异常时终止整个程序,可设置类方法Thread.abort_on_exception为true,它会在任意子线程抛出异常后自动传播给main线程,从而终止进程:

Thread.abort_on_exception = true Thread.new { raise "Error" } sleep 1 # 不会睡眠完1秒,而是子线程异常后立即异常退出

如果想要让某个特定的线程出现异常时终止整个程序,可设置同名的实例方法t.abort_on_exception为true,只有t线程异常时才会终止程序。

t1 = Thread.new { raise "Error from t1" } t1.abort_on_exception = true sleep 1

另外,线程实例方法t.raise()可以直接在线程t抛出异常。

需注意,Ruby线程有一个巨大的缺点:无论是raise抛出异常还是各种终止(比如kill、exit),都不会执行ensure子句。

线程的状态和生命周期

Ruby中的线程具有5种状态,可通过t.status()查看,该方法有5种对应的返回值:

- run: 线程正在运行(running)或可运行(runnable) - sleep: 线程处于睡眠态,比如阻塞(如sleep,mutex,io block) - false: 线程正常退出后的状态,包括执行完流程、手动退出(t.exit)、信号终止(t.kill) - nil: 线程因抛出异常(比如raise)而退出的状态 - aborting: 线程被完全kill之前的过渡状态,不考虑这种状态的存在

另外,还有两种统称状态:

alive:存活的线程,等价于run + sleep

stop:已停止的线程,等价于sleep + dead(false+nil)

可分别使用alive?()stop?()来判断线程是否属于这两种统称状态。

此外:

Kernel.sleep:让当前线程睡眠指定时长,无参数则永久睡眠,线程将进入睡眠队列 Thread.stop:让当前线程睡眠,进入睡眠队列,等价于无参数的sleep Thread.pass:转让CPU,当前线程进入就绪队列而不是睡眠队列 t.run:唤醒线程t使其进入就绪队列,同时让当前线程放弃CPU,调度程序将重新调度 t.wakeup:唤醒线程t使其进入就绪队列,但不会让当前线程放弃CPU,调度程序将不会立即重新调度 Thread.kill:终止指定线程,它将不再被调度 Thread.exit:终止当前线程,它将不再被调度 t.exit,t.kill,t.terminate:终止线程t,t将不再被调度

几个注意事项:

这里5个终止线程的方式效果上是完全等价的,三个实例方法是别名关系,而两个类方法的内部也都是调用线程对象的kill

最好要不加区分地看待run和wakeup

对于Thread.pass,除了知道它转让CPU的行为是确定的,不要对它假设任何额外的行为,比如不要认为出让CPU后一定会调度到其它Ruby线程,很有可能会在调度其它一些非Ruby线程后再次先调度到本线程而非其它Ruby线程

需注意,无论是raise抛出异常还是各种终止(比如kill、exit),都不会执行ensure子句

线程私有变量和局部变量

Ruby进程内的所有线程共享进程的虚拟地址空间,所以共享了一些数据。

但线程是语句块或者Proc对象,所以语句块内部创建的变量是在当前线程栈内部的,是每个线程私有的变量。

# 主线程中的变量 a = 1 # 子线程 t1 = Thread.new(3) do |x| a += 1 b=3 x=4 end # 主线程 t1.join p a # 2 #p b # 报错,b不存在 #p x # 报错,x不存在

Ruby为线程提供了局部变量共享的概念,每个线程对象都可以有自己的局部数据空间(即线程本地变量),线程对象的局部空间互不影响,比如两个线程中同时进行正则匹配,两个线程的$~是不一样且互不影响的。

线程对象t的局部数据空间是t[key]=value,即一个名为t的hash结构,因为对象t是可以共享的,所以它的局部空间也是共享的。

t1 = Thread.new do t = Thread.current t[:name] = "junmajinlong" t[:age] = 23 end t1.join p t1.keys # [:name, :age] p t1.key? :gender # false p t1[:name] # "junmajinlong" t1[:age] = 24 p t1[:age] # 24

所以,有这么几个方法:

t[key] t[key]= t.keys t.key?

此外还有一个fetch()方法,类似于Hash的fetch(),默认情况下访问不存在的key会异常,可指定默认值或通过语句块返回默认值。

严格来说,从Ruby 1.9出现Fiber之后,t[]不再是线程本地变量(thread-local),而是纤程(Fiber)本地变量(fiber-local)。但也支持使用线程本地变量:

t.thread_variables t.thread_variable? t.thread_variable_get t.thread_variable_set 线程组

默认情况下,所有线程都在默认的线程组中,这个默认线程组是Ruby程序启动时创建的。可使用ThreadGroup::Default获取默认线程组。

t1 = Thread.new do Thread.stop end p t1.group p Thread.current.group p ThreadGroup::Default =begin #<ThreadGroup:0x00000000019bcb60> #<ThreadGroup:0x00000000019bcb60> #<ThreadGroup:0x00000000019bcb60> =end

使用ThreadGroup.new可创建一个自定义的线程组

使用tg.add(t)可将线程t加入线程组tg,这将会从原来的线程组移除t再加入新组tg

使用tg.list可列出线程组tg中的所有线程

使用t.group可获取线程t所属的线程组

子线程会继承父线程的线程组,即子线程也会加入父线程所在的线程组

tg = ThreadGroup.new t1 = Thread.new { Thread.stop } t2 = Thread.new { Thread.stop } tg.add t1 tg.add t2 pp tg.list pp t1.group =begin [#<Thread:0x000000000196c480 a.rb:4 sleep_forever>, #<Thread:0x000000000196c3b8 a.rb:5 sleep_forever>] #<ThreadGroup:0x000000000196c520> =end

线程组还有一个功能:可使用tg.enclose封闭线程组tg,封闭后的线程组将不允许内部线程移出加入其它组,也不允许外界线程加入该组,只允许在该组中创建新线程。使用tg.enclosed?测试线程组tg是否已封闭。

其实,使用线程组可以将多个线程分类统一管理,线程组本质是一个线程数组加一些额外属性。比如,可以为线程组定义一些额外的针对线程组中所有线程的功能:wakeup组中的所有线程、join所有线程、kill所有线程。

class ThreadGroup def wakeup list.each(&:wakeup) end def join list.each { |th| th.join if th != Thread.current } end def kill list.each(&:kill) end end

更多关于Ruby多线程知识请查看下面的相关链接

推荐阅读