蒼時弦也
蒼時弦也
資深軟體工程師
發表於

淺談 Ruby 的 Fiber(六)

經過前面幾篇文章的介紹,我們已經初步的了解 Fiber 的性質。這系列的文章目標是利用 Fiber 實現再不透過 Thread 或者 Process 的情境,來實現支援多人連線的 TCP 聊天伺服器。

從這一篇開始,我們就要正式的來挑戰完整的實作了!

在開始之前,我們已經注意到前幾篇的程式碼已經開始有點複雜而且不好維護,所以我們要先做兩件事情來改善這個問題。

  1. 釐清功能
  2. 重構

功能分析

因為 Fiber 的特性,我們必須在所有遭遇到 Blocking I/O 的情境下轉為 Nonblocking I/O 來操作,也因此我們回來看一下前面幾篇需要處理 Blocking I/O 的情境。

  1. 接受連線的 #accept 行為
  2. 讀取使用者資料的 #gets 行為

為了能夠實現聊天室功能,我們至少還會需要再加入傳送資料給使用者的 #puts 行為。

而這些動作,我們都需要透過一個統一的物件來處理。

我們可以簡單的把他整理成類似像這樣的行為流程圖。

FlowchartDiagram1.png

如果照我們原來的做法,會發現很難統一管理 Fiber 來在可以操作時執行對應的動作,所以上圖執行 Fiber.yield 的部分,我們會用一個物件來做統一管理,其他部分則可以先維持原樣。

重構

首先,我們先嘗試實現一個 Selector 來將可以讀取或者寫入的 I/O 物件找出來。

修改後的程式碼大致上會像這樣,我們提供了一個 #register 方法讓暫時無法讀取的物件被記錄下來。

 1require 'socket'
 2require 'fiber'
 3
 4# :nodoc:
 5class Selector
 6  def initialize
 7    @fibers = {}
 8  end
 9
10  def register(io)
11    @fibers[io] = Fiber.current
12    Fiber.yield
13  end
14
15  def resume
16    readable, = IO.select(@fibers.keys)
17    readable.each do |io|
18      @fibers[io].resume
19      @fibers.delete(io)
20    end
21  end
22end
23
24selector = Selector.new
25server = TCPServer.new 3000
26
27loop do
28  begin
29    selector.resume
30
31    client = server.accept_nonblock
32    client.puts 'Hello World'
33
34    Fiber.new do
35      buffer ||= ''
36      begin
37        buffer << client.read_nonblock(1024)
38        puts buffer if buffer.include?("\n")
39      rescue IO::WaitReadable
40        selector.register(client)
41      end
42    end.resume
43  rescue IO::WaitReadable
44    sleep 1
45    retry
46  end
47end

不過這樣是無法正確執行的,因為 IO.select 行為是一個 Blocking I/O 的行為,不過我們可以將大量的 I/O 物件一次性的選取,只要有一個符合條件就可以解除。

而這段程式碼出問題的主因是,當開始後就會進入 IO.select 的阻塞狀態,但是伺服器的阻塞狀態並沒有被加入到其中管理,而造成無法正確運行。

因此,我們要將原本的程式碼再做出一些修正。

 1Fiber.new do
 2  loop do
 3    begin
 4      client = server.accept_nonblock
 5      client.puts 'Hello World'
 6
 7      Fiber.new do
 8        buffer ||= ''
 9        begin
10          buffer << client.read_nonblock(1024)
11          puts buffer if buffer.include?("\n")
12        rescue IO::WaitReadable
13          selector.register(client)
14          retry
15        end
16      end.resume
17    rescue IO::WaitReadable
18      selector.register(server)
19      retry
20    end
21  end
22end.resume
23
24loop do
25  selector.resume
26end

不過修改之後,卻發現因為加入了 Fiber.new 給伺服器後,原本的 retryloop 的角色似乎有點微妙,如果不使用 loop 的話,成功連線後就不會嘗試等待下一個新連線,而失敗的話不使用 retry 一樣也不會繼續嘗試處理新的連線,這樣整個工作分配變得有點混亂。

解析

要解決這樣的問題,最為理想的狀態是在 #accept_nonblock 的下一行馬上使用 Fiber.yield 以便 Fiber#resume 發生時能夠繼續還未完成的動作。

在 Ruby 裡面大部分的 Nonblocking I/O 方法都提供了 exception: false 的選項,讓我們達成這個條件。

小結

雖然開始嘗試重構,但是馬上又發現程式碼變的複雜,在下一篇我們會先嘗試採取 exception: false 的做法調整 Fiber 繼續執行的流程,然後再做一次重構讓程式碼恢復乾淨的狀態。