Perl 6 程式設計教學:共時性 (Concurrency) (高階 API)

PUBLISHED ON JAN 20, 2018 — PROGRAMMING
FacebookTwitter LinkedIn LINE Skype EverNote GMail Email Email

    由於 CPU 的時脈已經到物理上限,現在的硬體都往多核心、多 CPU 發展。同樣地,單一的大型伺服器相當昂貴,而且擴充量有限,使用多台主機組成的叢集則相對易於擴充。然而,若程式碼沒有使用共時性 (concurrency) 的特性來撰寫,則無法真正發揮平行處理 (parallel computing) 所帶來的效能提升。

    Perl 6 提供兩套共時性 API,一套是高階 API,另一套則是低階 API。高階 API 提供一些抽象的物件,像是 Promise、Supply、Channel 等,程式設計者不需擔心底層的虛擬機器實作,可專心在撰寫共時性程式上。低階 API 則提供較為傳統的共時性物件,包括 Thread 和 Lock (即 mutex) 等。除非有特別的理由,應優先使用高階 API。

    本文介紹高階 API。

    Promise

    Promise 是 Perl 6 執行共時性程式區塊的基本單位,在別的語言也稱為 Future。

    建立 Promise

    以下範例啟動一個 Promise:

    my $p = Promise.start({
        say "Hello from a Promise";
    });
    
    await $p;
    

    由於 Promise 是共時執行,我們會用 await 等待該 Promise 結束。

    也可以用以下較為簡略的寫法:

    my $p = start {
        say "Hello from a Promise";
    };
    
    await $p;
    

    Promise 也可以回傳資料,如下例:

    my $p = start {
        (1..10).reduce({ $^a + $^b });
    }
    
    my $sum = await $p;
    $sum == 55 or die "Wrong value";
    

    如果需要確保 Promise 的順序,可以使用 then 方法,如下例:

    my $p = Promise.start({
        say "Hello from first promise";
    })
    .then({
        say "Hello from second promise";
    });
    
    await $p;
    

    由於 Promise 區塊是共時發生的,無法保證其順序。例如,多執行幾次以下程式,會發現每次順序會略為不同:

    my $p1 = start {
        say "Message from promise 1";
    };
    
    my $p2 = start {
        say "Message from promise 2";
    };
    
    my $p3 = start {
        say "Message from promise 3";
    };
    
    await $p1, $p2, $p3;
    

    Promise 可指定在特定秒數後才執行,如下例:

    my $p = Promise
        .in(1)
        .then({ say "Hello from promise"; });
    
    await $p;
    

    Promise 的狀態

    Promise 建立後,有三個狀態:

    • Planned:Promise 建立時的狀態
    • Kept:Promise 執行成功
    • Broken:Promise 執行失敗

    必要時,可以手動控制狀態。在下例中,我們將 Promise 的狀態調到 Kept

    my $p1 = Promise.new;
    say $p1.status;
    
    $p1.keep("result");
    say $p1.status;
    say $p1.result;
    

    若 Promise 狀態變為 Broken,則要以 CATCH 區塊去接,如下例:

    my $p2 = Promise.new;
    
    $p2.break('oh no');
    say $p2.status;
    
    # It dies because the promise is broken.
    say $p2.result; 
    
    CATCH { default { say .^name, ': ', .Str } };
    

    (待確認…)

    my $promise1 = Promise.new();
    my $promise2 = $promise1.then(-> $v { say "Handled but : "; say $v.result});
    
    $promise1.break("First Result");
    try $promise2.result;
    
    say $promise2.cause;
    

    控制多個 Promise

    我們可以用 Promise 控制其他 Promise。例如,使用 allof 方法可以等待所有 Promise 執行結束後才結束該Promise:

    my @ps;
    
    for 1..5 -> $i {
        push @ps, start {
            sleep $i;
        }
    }
    
    my $p = Promise.allof(@ps);
    
    await $p;
    say "All done";
    

    使用 anyof 可以在其中一個 Promise 結束時即結束該 Promise:

    my $p = Promise.anyof(
        Promise.in(3),
        Promise.in(8600),
    );
    
    await $p;
    say "All done";
    

    Supply

    Supply 用來撰寫非同步資料流 (asynchronous data streaming),在別的程式語言稱為 events 或 reactive programming。

    建立 Supply

    Supply 以 tapemit 方法搭配使用。Perl 6 提供兩種方法來建立 supply。

    可透過 Supplier 工廠建立一個 live supply,傳入資料時才執行:

    my $supplier = Supplier.new;
    my $supply   = $supplier.Supply;
     
    $supply.tap( -> $v { say $v });
     
    for 1 .. 10 {
        $supplier.emit($_);
    }
    

    也可以透過 supply 函式建立一個 on-demand supply,呼叫時即執行:

    my $supply = supply {
        for 1 .. 10 {
            emit($_);
        }
    };
    
    $supply.tap( -> $v { say $v });
    

    On-demand supply 可重覆呼叫,如下例:

    my $supply = supply {
        for 1 .. 10 {
            emit($_);
        }
    };
    
    $supply.tap( -> $v { say "First : $v" });
    $supply.tap( -> $v { say "Second : $v" });
    

    關閉 supply

    tap 方法會回傳 tap 物件,我們可以藉由 tap 物件關閉 supply,如下例:

    my $supplier = Supplier.new;
    my $supply   = $supplier.Supply;
     
    my $tap = $supply.tap( -> $v { say $v });
     
    $supplier.emit("OK");
    $tap.close;
    $supplier.emit("Won't trigger the tap");
    

    定期傳輸資料

    使用 interval 方法,可以控制 supply 在特定的秒數定期傳出資料,每次傳出的數字會遞增:

    my $supply = Supply.interval(2);
    $supply.tap(-> $v { say $v });
    sleep 10;
    

    grep

    透過 grep 方法可從原來的 supply 過濾掉某些元素後再建立新的 supply,如下例:

    my $supplier = Supplier.new;
    my $supply = $supplier.Supply;
    
    $supply.tap(-> $v { say "Original : $v" });
    
    my $odd_supply = $supply.grep({ $_ % 2 });
    $odd_supply.tap(-> $v { say "Odd : $v" });
    
    my $even_supply = $supply.grep({ not $_ % 2 });
    $even_supply.tap(-> $v { say "Even : $v" });
    
    for 0 .. 10 {
        $supplier.emit($_);
    }
    

    map

    透過 map 方法可將 supply 的元素轉換後傳回新的 supply,如下例:

    my $supplier = Supplier.new;
    my $supply = $supplier.Supply;
    
    $supply.tap(-> $v { say "Original : $v" });
    
    my $sqr_supply = $supply.map({ $_ ** 2 });
    $sqr_supply.tap(-> $v { say "Square : $v" });
    
    for 0 .. 10 {
        $supplier.emit($_);
    }
    

    改變流程

    my $supplier = Supplier.new;
    my $supply = $supplier.Supply;
    
    $supply.tap: -> $v { say $v; },
        done => { say 'Job is done.' },
        quit => {
            default { say "App Error: ", $_.message }
        };
    
    for 0 .. 10 {
        $supplier.emit($_);
    }
    
    $supplier.done;
    

    react 區塊

    我們也可以用 react 區塊定期傳出資料,當呼叫 done() 函式時結束此區塊。

    react {
        whenever Supply.interval(2) -> $v {
            say $v;
            done() if $v == 4;
        }
    }
    

    react 區塊也可以接受一個串列:

    react {
        whenever Supply.from-list(1..10) -> $v {
            say $v;
        }
    }
    

    Channel

    Channel 用於共時性程式中傳遞數據。

    建立 Channel

    以下範例程式建立一個 channel 並傳送資料:

    my $c = Channel.new;
    $c.send("Hello from channel");
    say $c.receive;
    

    list 方法會回傳 channel 所有的數據,直到該 channel 關閉。

    my $c = Channel.new;
    await (^10).map: -> $n {
        start {
            sleep $n;
            $c.send($n);
        }
    }
    $c.close;
    for $c.list -> $r {
        say $r;
    }
    

    poll

    我們也可以利用 poll 進行非阻塞性回傳資料,如下例:

    my $c = Channel.new;
    
    # Start three Promises that sleep for 1..3 seconds, and then
    # send a value to our Channel
    ^3 .map: -> $v {
        start {
            sleep 3 - $v;
            $c.send: "$v from thread {$*THREAD.id}";
        }
    }
    
    # Wait 3 seconds before closing the channel
    Promise.in(3).then: { $c.close }
    
    # Continuously loop and poll the channel, until it's closed
    loop {
        if $c.poll -> $item {
            say "$item received after {now - INIT now} seconds";
        }
        elsif $c.closed {
            last;
        }
    }
    

    react 區塊

    Channel 也可以取代 supply,和 react 區塊搭配,如下例:

    my $channel = Channel.new;
    my $p = start {
        react {
            whenever $channel {
                say $_;
            }
        }
    }
    
    await (^10).map: -> $r {
        start {
            sleep $r;
            $channel.send($r);
        }
    }
    
    $channel.close;
    await $p;
    

    或是將 channel 和 supply 結合,如下例:

    my $supplier = Supplier.new;
    my $supply   = $supplier.Supply;
    my $channel = $supply.Channel;
     
    my $p = start {
        react  {
            whenever $channel -> $item {
                say "via Channel: $item";
            }
        }
    }
     
    await (^10).map: -> $r {
        start {
            sleep $r;
            $supplier.emit($r);
        }
    }
     
    $supplier.done;
    await $p;
    

    Proc::Async

    Proc::Async 可用非同步的方式執行外部指令,如下例:

    my $proc = Proc::Async.new('echo', 'foo', 'bar');
     
    $proc.stdout.tap(-> $v { print "Output: $v" });
    $proc.stderr.tap(-> $v { print "Error:  $v" });
     
    say "Starting...";
    my $promise = $proc.start;
     
    await $promise;
    say "Done.";
    

    如果需要從程式寫入標準輸入,可加入 :w 來修飾,如下例:

    my $proc = Proc::Async.new(:w, 'grep', 'foo');
     
    $proc.stdout.tap(-> $v { print "Output: $v" });
     
    say "Starting...";
    my $promise = $proc.start;
     
    $proc.say("this line has foo");
    $proc.say("this one doesn't");
    
    $proc.close-stdin;
    await $promise;
    say "Done.";