2019年9月26日木曜日

開発環境

並行コンピューティング技法 ―実践マルチコア/マルチスレッドプログラミング (Clay Breshears(著)、千住 治郎(翻訳)、オライリージャパン)の6章(並列和とセレクション)、6.3(セレクション)、6.3.1(逐次アルゴリズム)をC言語(Windowsスレッド、POSIXスレッド、Pthreadライブラリ、OpenMPライブラリ等)ではなくGo言語(go文、goroutine、channel等)で取り組んでみる。

コード

main_test.go

package main

import (
 "testing"
)

func TestSequentialSelect(t *testing.T) {
 for i := 0; i < 10; i++ {
  numSlice := []int{10, 4, 0, 3, 1, 2, 9, 5, 8, 6, 7, 10}
  want := i
  k := i + 1
  got := SequentialSelect(numSlice, k)
  if got != want {
   t.Errorf("SequentialSelect(%v, %v) got %v, want %v",
    numSlice, k, got, want)
  }
 }
}

func TestConcurrentSelect(t *testing.T) {
 for i := 0; i < 10; i++ {
  numSlice := []int{10, 4, 0, 3, 1, 2, 9, 5, 8, 6, 7, 10}
  want := i
  k := i + 1
  got := ConcurrentSelect(numSlice, k)
  if got != want {
   t.Errorf("SequentialSelect(%v, %v) got %v, want %v",
    numSlice, k, got, want)
  }
 }
}

main.go

package main

import (
 "fmt"
 "log"
 "math/rand"
 "os"
 "strconv"
 "sync"
 "time"
)

func main() {
 numSlice1 := []int{}
 numSlice2 := []int{}
 max, err := strconv.Atoi(os.Args[1])
 if err != nil {
  log.Fatal(err)
 }
 rand.Seed(time.Now().UTC().UnixNano())
 for i := 0; i < max; i++ {
  n := rand.Intn(max)
  numSlice1 = append(numSlice1, n)
  numSlice2 = append(numSlice2, n)
 }
 k := max / 2
 t1 := time.Now().UnixNano()
 n := SequentialSelect(numSlice1, k)
 t1 = time.Now().UnixNano() - t1
 fmt.Printf("逐次アルゴリズム: %vナノ秒 %v\n", t1, n)
 t2 := time.Now().UnixNano()
 m := ConcurrentSelect(numSlice2, k)
 t2 = time.Now().UnixNano() - t2
 fmt.Printf("並行アルゴリズム: %vナノ秒 %v\n", t2, m)
}

var Q int = 5

func SequentialSelect(nums []int, k int) int {
 if len(nums) < Q {
  return sortLessThanQ(nums, k)
 }
 chunkNum := len(nums)/Q + 1
 medians := make([]int, chunkNum)
 i := 0
 for j := 0; j < len(nums)/Q; j++ {
  medians[j] = sortSelect5(nums[i:], 3)
  i += Q
 }
 lastNum := len(nums) - Q*len(nums)/Q
 if lastNum != 0 {
  lastQ := Q * len(nums) / Q
  medians[chunkNum-1] = sortLessThanQ(nums[lastQ:], (lastNum+1)/2)
 } else {
  chunkNum--
  medians = medians[:chunkNum]
 }
 median := SequentialSelect(medians, (chunkNum+1)/2)
 lessEqualGreater, marks := countAndMark(nums, median)
 if lessEqualGreater[0] >= k {
  packSlice := pack(nums, marks, 0)
  median := SequentialSelect(packSlice, k)
  return median
 }
 if lessEqualGreater[0]+lessEqualGreater[1] >= k {
  return median
 }
 packSlice := pack(nums, marks, 2)
 median = SequentialSelect(packSlice,
  k-(lessEqualGreater[0]+lessEqualGreater[1]))
 return median
}
func ConcurrentSelect(nums []int, k int) int {
 if len(nums) < Q {
  return sortLessThanQ(nums, k)
 }
 chunkNum := len(nums)/Q + 1
 medians := make([]int, chunkNum)
 var wg sync.WaitGroup
 for j := 0; j < len(nums)/Q; j++ {
  wg.Add(1)
  go func(j int) {
   defer wg.Done()
   medians[j] = sortSelect5(nums[j*Q:], 3)
  }(j)
 }
 wg.Add(1)
 go func() {
  defer wg.Done()
  lastNum := len(nums) - Q*len(nums)/Q
  if lastNum != 0 {
   lastQ := Q * len(nums) / Q
   medians[chunkNum-1] = sortLessThanQ(nums[lastQ:], (lastNum+1)/2)
  } else {
   chunkNum--
   medians = medians[:chunkNum]
  }
 }()
 wg.Wait()
 median := ConcurrentSelect(medians, (chunkNum+1)/2)
 lessEqualGreater, marks := countAndMark(nums, median)
 if lessEqualGreater[0] >= k {
  packSlice := pack(nums, marks, 0)
  median := ConcurrentSelect(packSlice, k)
  return median
 }
 if lessEqualGreater[0]+lessEqualGreater[1] >= k {
  return median
 }
 packSlice := pack(nums, marks, 2)
 median = ConcurrentSelect(packSlice,
  k-(lessEqualGreater[0]+lessEqualGreater[1]))
 return median
}

func sortLessThanQ(nums []int, k int) int {
 switch len(nums) {
 case 1:
  return nums[0]
 case 2:
  return sortSelect2(nums, k)
 case 3:
  return sortSelect3(nums, k)
 case 4:
  return sortSelect4(nums, k)
 case 5:
  return sortSelect5(nums, k)
 default:
  log.Fatal("Error")
  return -1
 }
}

// 冗長だけど単純なバブルソート
func swap(nums []int, i, j int) {
 t := nums[i]
 nums[i] = nums[j]
 nums[j] = t
}
func sortSelect2(nums []int, k int) int {
 if nums[0] > nums[1] {
  swap(nums, 0, 1)
 }
 return nums[k-1]
}
func sortSelect3(nums []int, k int) int {
 if nums[0] > nums[1] {
  swap(nums, 0, 1)
 }
 if nums[1] > nums[2] {
  swap(nums, 1, 2)
  if nums[0] > nums[1] {
   swap(nums, 0, 1)
  }
 }
 return nums[k-1]
}
func sortSelect4(nums []int, k int) int {
 if nums[0] > nums[1] {
  swap(nums, 0, 1)
 }
 if nums[1] > nums[2] {
  swap(nums, 1, 2)
  if nums[0] > nums[1] {
   swap(nums, 0, 1)
  }
 }
 if nums[2] > nums[3] {
  swap(nums, 2, 3)
  if nums[1] > nums[2] {
   swap(nums, 1, 2)
   if nums[0] > nums[1] {
    swap(nums, 0, 1)
   }
  }
 }
 return nums[k-1]
}
func sortSelect5(nums []int, k int) int {
 if nums[0] > nums[1] {
  swap(nums, 0, 1)
 }
 if nums[1] > nums[2] {
  swap(nums, 1, 2)
  if nums[0] > nums[1] {
   swap(nums, 0, 1)
  }
 }
 if nums[2] > nums[3] {
  swap(nums, 2, 3)
  if nums[1] > nums[2] {
   swap(nums, 1, 2)
   if nums[0] > nums[1] {
    swap(nums, 0, 1)
   }
  }
 }
 if nums[3] > nums[4] {
  swap(nums, 3, 4)
  if nums[2] > nums[3] {
   swap(nums, 2, 3)
   if nums[1] > nums[2] {
    swap(nums, 1, 2)
    if nums[0] > nums[1] {
     swap(nums, 0, 1)
    }
   }
  }
 }
 return nums[k-1]
}

func countAndMark(nums []int, median int) ([]int, []int) {
 lessEqualGreater := make([]int, 3)
 marks := []int{}
 for _, num := range nums {
  if num < median {
   lessEqualGreater[0]++
   marks = append(marks, 0)
  } else if num == median {
   lessEqualGreater[1]++
   marks = append(marks, 1)
  } else {
   lessEqualGreater[2]++
   marks = append(marks, 2)
  }
 }
 return lessEqualGreater, marks
}

func pack(nums []int, marks []int, scanSym int) []int {
 packSlice := []int{}
 for i, mark := range marks {
  if mark == scanSym {
   packSlice = append(packSlice, nums[i])
  }
 }
 return packSlice
}

入出力結果(Bash、cmd.exe(コマンドプロンプト)、Terminal)

$ go test
PASS
ok   _/.../go/並行コンピューティング技法/ch6/main 0.005s
$ go build main.go
$ ./main 100
逐次アルゴリズム: 6000ナノ秒 52
並行アルゴリズム: 80000ナノ秒 52
$ ./main 10
逐次アルゴリズム: 3000ナノ秒 4
並行アルゴリズム: 58000ナノ秒 4
$ ./main 5
逐次アルゴリズム: 2000ナノ秒 1
並行アルゴリズム: 23000ナノ秒 1
$ ./main 4
逐次アルゴリズム: 3000ナノ秒 0
並行アルゴリズム: 1000ナノ秒 0
$ ./main 1
逐次アルゴリズム: 1000ナノ秒 0
並行アルゴリズム: 0ナノ秒 0
$ ./main 10000
逐次アルゴリズム: 1091000ナノ秒 5021
並行アルゴリズム: 4041000ナノ秒 5021
$ ./main 1000000
逐次アルゴリズム: 75483000ナノ秒 500073
並行アルゴリズム: 275047000ナノ秒 500073
$ 

中央値を求める部分を並行化してみたけど、並行アルゴリズムの方がだいぶ遅かった。アルゴリズムを間違えてるか、うまく並行化できてないのか、Go言語のgoroutineで並行化してもこのアルゴリズムの場合効果がないのか。syncパッケージのWaitGroup、Addメソッド、Doneメソッド、Waitメソッドではなく、channelを使った自然な書き方があって、それならもしかしたら逐次アルゴリズムより並行アルゴリズムの方が速くなるかも。

0 コメント:

コメントを投稿

関連コンテンツ