cloverrose's blog

Python, Machine learning, Emacs, CI/CD, Webアプリなど

選択されているTabのIndicatorの色を変える方法

この記事と同じ現象がTabのIndicatorでも起きていて、どの色を変えてもtealAccent[200]から変わらない zenn.dev

変えるにはtabBarThemeを直接設定する

darkTheme: ThemeData(
        useMaterial3: true,
        tabBarTheme: Theme.of(context).tabBarTheme.copyWith(
              indicator: const UnderlineTabIndicator(
                borderSide: BorderSide(width: 2.0, color: Colors.white), // whiteに変わる
              ),
            ),
        colorScheme: const ColorScheme.dark(... // こちらのどこの項目をいじっても色が変わらない

Go言語による並行処理 4.7 ファンアウト、ファンイン

www.oreilly.co.jp

Go言語による並行処理の本は、仕事でよく使う部分とあんまり使わない部分がある。 よく使う部分をコピペで使いやすくしておく。

ファンアウト、ファンインの例はdone channelだけど仕事ではcontextを使うのでそこは変えてある。

package main

import (
    "context"
    "errors"
    "sync"
    "time"
)

type Value struct {
    value int
}

type generator struct {
    cnt          int
    max          int
    waitDuration time.Duration
}

func (g *generator) Gen() (*Value, error) {
    if g.cnt == g.max {
        return nil, errors.New("done")
    }
    time.Sleep(g.waitDuration)
    g.cnt++
    v := &Value{value: g.cnt}
    return v, nil
}

func batchMaker(ctx context.Context, g *generator, batchSize int) <-chan []*Value {
    batchCh := make(chan []*Value)
    go func() {
        defer close(batchCh)
        batch := make([]*Value, 0, batchSize)
        for {
            value, err := g.Gen()
            if err != nil {
                // send last batch
                select {
                case <-ctx.Done():
                case batchCh <- batch:
                }
                return
            }
            batch = append(batch, value)
            if len(batch) < batchSize {
                continue
            }
            select {
            case <-ctx.Done():
                return
            case batchCh <- batch:
                batch = make([]*Value, 0, batchSize)
            }
        }
    }()
    return batchCh
}

func batchConsumer(ctx context.Context, batchCh <-chan []*Value, fn func(context.Context, []*Value) (*Value, error)) <-chan *Value {
    resultCh := make(chan *Value)
    go func() {
        defer close(resultCh)
        for batch := range batchCh {
            result, err := fn(ctx, batch)
            if err != nil {
                return // or ignore error
            }
            select {
            case <-ctx.Done():
                return
            case resultCh <- result:
            }
        }
    }()
    return resultCh
}

func fanIn(ctx context.Context, channels ...<-chan *Value) <-chan *Value {
    var wg sync.WaitGroup
    merged := make(chan *Value)
    multiplex := func(c <-chan *Value) {
        defer wg.Done()
        for v := range c {
            select {
            case <-ctx.Done():
                return
            case merged <- v:
            }
        }
    }
    wg.Add(len(channels))
    for _, c := range channels {
        go multiplex(c)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

func makeCalc(waitDuration time.Duration) func(context.Context, []*Value) (*Value, error) {
    return func(ctx context.Context, batch []*Value) (*Value, error) {
        result := 0
        for _, v := range batch {
            select {
            case <-ctx.Done():
                return nil, ctx.Err()
            case <-time.After(waitDuration):
                result += v.value
            }
        }
        return &Value{value: result}, nil
    }
}

func Demo(ctx context.Context, g *generator, batchSize int, numConsumers int, fn func(context.Context, []*Value) (*Value, error)) {
    batchCh := batchMaker(ctx, g, batchSize)
    resultChs := make([]<-chan *Value, numConsumers)
    for i := 0; i < numConsumers; i++ {
        resultChs[i] = batchConsumer(ctx, batchCh, fn)
    }
    pipeline := fanIn(ctx, resultChs...)
    for _ = range pipeline {
    }
}

パフォーマンステストで本当に速くなってるかも確認

package main

import (
    "context"
    "testing"
    "time"
)

func BenchmarkDemo(b *testing.B) {
    type args struct {
        ctx          context.Context
        g            *generator
        batchSize    int
        numConsumers int
        fn           func(context.Context, []*Value) (*Value, error)
    }
    tests := []struct {
        name    string
        args    args
        wantErr bool
    }{
        {
            name: "no sleep",
            args: args{
                ctx:          context.Background(),
                g:            &generator{max: 100 * 80},
                batchSize:    100,
                numConsumers: 1,
                fn:           makeCalc(0),
            },
            wantErr: false,
        },
        {
            name: "no sleep x7",
            args: args{
                ctx:          context.Background(),
                g:            &generator{max: 100 * 80},
                batchSize:    100,
                numConsumers: 7,
                fn:           makeCalc(0),
            },
            wantErr: false,
        },
        {
            name: "maker and consumer is slow",
            args: args{
                ctx:          context.Background(),
                g:            &generator{max: 100 * 80, waitDuration: 1 * time.Millisecond},
                batchSize:    100,
                numConsumers: 1,
                fn:           makeCalc(1 * time.Millisecond),
            },
            wantErr: false,
        },
        {
            name: "maker and consumer is slow x7",
            args: args{
                ctx:          context.Background(),
                g:            &generator{max: 100 * 80, waitDuration: 1 * time.Millisecond},
                batchSize:    100,
                numConsumers: 7,
                fn:           makeCalc(1 * time.Millisecond),
            },
            wantErr: false,
        },
        {
            name: "consumer is slower",
            args: args{
                ctx:          context.Background(),
                g:            &generator{max: 100 * 80},
                batchSize:    100,
                numConsumers: 1,
                fn:           makeCalc(1 * time.Millisecond),
            },
            wantErr: false,
        },
        {
            name: "consumer is slower x7",
            args: args{
                ctx:          context.Background(),
                g:            &generator{max: 100 * 80},
                batchSize:    100,
                numConsumers: 7,
                fn:           makeCalc(1 * time.Millisecond),
            },
            wantErr: false,
        },
    }
    for _, tt := range tests {
        b.Run(tt.name, func(b *testing.B) {
            Demo(tt.args.ctx, tt.args.g, tt.args.batchSize, tt.args.numConsumers, tt.args.fn)
        })
    }
}

in-memoryなcache libraryをさっと調べた

要件

GRPC endpointへの負荷を減らすために、結果をキャッシュしたい。

複数の関数からは参照されないので、Cache objectを適切なStruct内に持っていてSet/Getを呼べれば充分。

必須な要件

  • TTLを設定できて自動的にExpireしたEntryを削除してくれる
  • Thread safe

あったらいいなの要件

  • 充分メンテナンスされている
  • Starが多い
  • Codeがシンプルで無駄な機能がない
  • 簡単に使いたいのでEntryにDeSerialize掛けくて直にSet/Getできる
  • 簡単に使いたいのでDefault TTLを設定できる
  • Valueの型をGenerics的に指定できて、GetしたValueの型Castしなくていい
  • Entryの最大数を設定できて、もし予想外にEntryを追加されてメモリを圧迫しないようにできる
  • Cache hitしなかったときにCallback関数の結果を格納する
    • 自前でLockしなくても同じタイミングでCache missしたときに複数プロセスが重い処理をしないことが保証できるのがいいな

なくてもいい機能

  • KeyごとにTTL変える必要はない
  • 多数のEntryを管理できなくていい
  • Entry数は充分小さいので凝ったReplacement policy (LRUなど)はなくてもいい
  • KeyはString型なので任意の型をKeyに使えなくても良い

選んだLibrary

要件がかなりシンプルなので、充分なシンプルなLibraryでコードを読んで理解できるものを選んだ。

GitHub - patrickmn/go-cache: An in-memory key:value store/cache (similar to Memcached) library for Go, suitable for single-machine applications.github.com

コードがかなりシンプルでどこでLockかけているかとか、いつExpireした結果が消されるかが理解しやすい。

探したときのメモ

GitHub - eko/gocache: ☔️ A complete Go cache library that brings you multiple ways of managing your cachesのMemory Storeでサポートされているライブラリとかも良さそう

GitHub - avelino/awesome-go: A curated list of awesome Go frameworks, libraries and softwareでcacheとついているものを調べた(Starが少なすぎるものは除外)

感想

  • TTLとReplacement policyとGenericsをサポートしていて充分シンプルなLibraryがなかった
  • Cacheライブラリは差別化がいろいろあって、In-memoryなのでコードを読んで挙動を理解するのも頑張ればできうそうなので深堀りしたいってなるライブラリがいくつかあった。

Make tips: Cloud Spanner EmulatorとGCP Spannerの2つの接続先をもつMakefileの作り方

最近仕事でGCPのCloud Spannerを使い始めて勉強中です。今回はSpannerを使うときにMakeでこんなことできるんだ!って知ったことを共有します。

恥ずかしながら今までMakeを体系的に勉強していませんでした。 しかし社内のシステムはだいたいがMakeで運用コマンド作ることが多く、今回のシステムも例にもれずMakefileを書くことになったので、ちゃんと勉強しよう!と思い参考書を読んだらもっと早く読んでおけばよかったーということだらけでした。

しかもなんとオライリーGNU Makeの本無料PDF読める。

www.oreilly.co.jp

以下のブログがPDFを連結して扱いやすくして再配布してくれています。 Kindleに入れたら目次も機能したのでかなり助かりました。

https://www.yokoweb.net/2016/08/28/gnu-make-3rd-pdf-github/

1章から5章までが基礎編で今回はそこだけ読んでんですが、これだけでもMakefileに向き合うときの苦手意識がなくなったのでおすすめです。


まずはじめに、自分がMakefileの構成要素をなんと呼ぶのかわからず検索するときに苦労したのでそこから。(この情報は1章でわかります)

f:id:cloverrose:20200718105254p:plain
Makefile構成要素の名前

  • 作りたい・実行したいもののことをターゲット, target
  • targetが依存しているもののことを必須項目, prereq, prerequisite
  • 実行するもののことをコマンド,command
  • これらの塊をルール, rule と呼ぶ

次に今回自分がMakeをちゃんと勉強するまでわからず苦戦していた、GCPのCloud SpannerとLocalのSpanner Emulatorと接続先の切り替え方について紹介します。

Cloud SpannerのEmulatorはGCP本家が出しているこちらを使っています。Docker imageも提供されているのですぐに使い始めることができます。

github.com

GCPのCloud Spannerの中での接続先の切り替えは各種ツールのproject,instance,databaseのパラメータを変えることで可能ですが、 一方でGCPのCloud SpannerとLocal環境のEmulatorの接続先の切り替えは、こちらのドキュメントにあるようにexport SPANNER_EMULATOR_HOST=localhost:9010のように環境変数を設定すると、SpannerのクライアントライブラリはGCPにあるSpannerではなく、Localで動いているEmulatorに接続するようになります。

cloud.google.com

Cloud SpannerのMigrationツールとしてwrenchを使います。このツールもSPANNER_EMULATOR_HOST環境変数によってLocal環境を使うようになります。

github.com

そこで今回問題になったのが、1つのMakefileでLocalのEmulatorに対して使いたいコマンドとGCPのSpannerに対して使いたいコマンドがあるときに、Globalに環境変数をセットするとすべてがLocalに接続してしまうことです。

ここでGNU Make 第3版の3.5章「ターゲットとパターンに固有の変数」がまさに解決策でした。

f:id:cloverrose:20200718111526p:plain
ターゲット固有の変数の設定

このようにtarget:の横に変数の設定を記述するとそのルールと、このルールを実行するために必要なprerepのルールのスコープでだけ有効な変数を設定できます。

なのでdev/migrateGCPemulator/migrateはLocalのEmulatorに接続するように使い分けることができます。

注意点としてcommandsを設定する前はtarget: 変数設定ではなくてtarget:target:必須項目のどちらかである必要があります。(エラーがでます)

10日間大学時代の気分に戻ってLTLで仕様を書いてみた

会社のイベントで1週間何しててもいい期間があったので、大学時代に研究していた形式手法の1分野であるLTL仕様からオートマトン合成の分野に触れていました。

タイムライン

  • 以前使っていたAcacia+というツールがリンク切れになっていたことに驚く
  • LTLからオートマトン合成のコンペティションが2014年から開催されており使いやすく高性能なLTL合成ツールの発展に貢献していた
  • TLSFという生のLTLをラップした仕様形式がベンチマークの共通形式として普及していた。入出力のシグナルとオプションがLTLとセットで管理できる。
  • Strixというツールが2019年のコンペで1位になっていたのでOnlineでもページで遊んでみる
  • Strixが良さそうなのでDockerfile作ってインストールして触ってみる
  • Strixの内部でowlというLTLパースとωオートマトン変換をやるJavaライブラリ使っていて、便利な共通ライブラリできてることに感動
  • 仕様を書いて遊んでみるがなかなか欲しいオートマトンが出てこなくて苦戦
  • LTLオペレーターのUntilを誤用していてWeak Untilを使わないといけなかったことに気づく
  • 大学のときは仕様を書くこと自体はあまりやってなかったので大きな仕様を書くのは大変で便利なパーツを定義していって組み合わせることの重要性を認識する
  • Owlのonline demoでLTLからオートマトンを作って検証しながらパーツを作っていくとやりやすかった。学生の時よりωオートマトンそのものをちゃんとトレースした気がする
  • 会社のシステム開発で使えないかなーと思ってTimeout付きのシステムの仕様を書いた。
  • LTLから合成されるオートマトンを見ることで必要最低限な内部状態がわかるので、システム設計のときに無駄な状態作ってないか調べるのには使えるかも。
  • 上のが終わったので大学時代に一番書いてたエレベーターの仕様を書いた。何故か研究室ではエレベーターの仕様がよく使われていたが、簡単そうに見えて状態が複雑でちょうどいい題材だと気づいた。
  • 呼ばれてもないのに1F->2F->3F->2F->1Fとぐるぐるまわるエレベーターはすぐ書けたが、呼ばれてないときは止まってるエレベーターを書くのは1日かかった
  • 手元のMacBookProだと3F建てはできるけど、4F建てやリフト2機のエレベーターはすごく時間がかかる。
  • ボタンを押したらいつか来る(G(req -> F(grant)))仕様はできてもK時間以内に籠が来るエレベーターは3F建てが限界だった。
  • 4F建て1機の必要ないときは動かないエレベーターが作れた
  • 可視化してみた

成果物

github.com

感想

2014年から6年経ち2020年になったが、LTLからオートマトンを合成する計算オーダーが2-EXPなのでいくらツールが良くなっても大きい複雑な仕様は難しかった。

学生の時実現可能性判定の結果だけ見てたので、途中のωオートマトンを色々見てLTL仕様のデバッグをする体験は面白かった。

Spark版xgboostでRank学習できるようにPR投げた

機械学習で有名なxgboostというライブラリに先日PRがマージされた!嬉しい!

[jvm-packages] call setGroup for ranking task by cloverrose · Pull Request #2066 · dmlc/xgboost · GitHub

PRを送った経緯

  • 会社の業務で検索のランキングモデルを作っていて、xgboostへの移行を検討している。(現在進行形)
  • 会社にはHadoopクラスタがあり、ログはHadoop上に乗っている
  • なのでHadoop上でログの整形→学習が完結するとかなりクール
  • Hadoop上で動くxgboostはYARN版とSpark版がある
    • YARN版はHadoopクラスタにライブラリを追加しないといけないなどの理由でSpark版の方を採用
  • xgboost4j-sparkにはRank学習をサポートしていなかった
  • ただxgboost4j-scalaまでたどればsetGroupメソッドが用意されているので、Sparkからメソッドを呼ぶようにすればよいことがわかった

自分がコードの意図をちゃんと理解していない部分が何個かあったためレビューをたくさんしていただいた。

変更自体は9月くらいに実装して、機械学習のコンペで使っていたのですが、ついにPRを投げてそれがマージされたのは嬉しかった。

pywebhdfsにHAとFederationをサポートするPRがマージされた

WebHDFSについて

WebHDFShdfsコマンドではなくREST APIにhttpでアクセスできる便利なもの。

Hoop(httpfs)とwebhdfsの違い - たごもりすメモとかが図もあってわかりやすいと思う。

背景1 (hdfsコマンドへの不満)

MapReduceなどを使って解析を行って、その結果を可視化するとかを行っていると、スクリプトの中で頻繁にHDFSにアクセスすることになった。

そしてShellスクリプトが量産されるわけだが、自分はPythonスクリプトを書きたかった。

最初はsubprocessの中でhdfsコマンドを叩いていたが、レイテンシが結構あるし、lsコマンドの結果をパースしてファイル名を取得する必要があって微妙だった。

そんな話をしたらWebHDFSがあるよと教えてもらった。

背景2 (WebHDFSとの出会い)

WebHDFSに触ってみたら、レイテンシがすごく短いし、JSONが返ってくるので扱いやすい。心が踊った。

テンションが上ってHDFSのWebUIみたいなことができるシェルインタプリタを作った。 できる処理はls, cat, cd, pwdでread/get的なものだけにしていた。

その時自分でWebHDFSのREST APIのうすいラッパーを実装した。

これがわりと必要十分な機能は揃っていたためチームでもこのapi.pyが使われるようになった。

背景3 (自作ラッパーの限界)

自作ラッパーはシェルインタプリタ用に遊びで作っていたので、実業務で利用されるとカバーしているAPIが少なかった。

特に新規ファイル作成とかは、1回NameNodeにリクエストを投げて、返ってきたDataNodeのURIに対して再度ファイル内容とともにリクエストを投げるという形式で、自分で全部実装するのはめんどくさすぎた。

また認証周りもめんどくさくて、APIに渡す引数が増えていきそうだった。

そこでOSSPythonのWebHDFSラッパを探すことにした。

ライブラリ探し

PythonからHDFSを操作する - 偏った言語信者の垂れ流しに辿り着いた。

そこでは2つのライブラリが比較されていた。2013年の比較だが参考になった。

実際、PYPIを見てもWebHDFSライブラリはリリースが2014-01-20と古く、もうメンテナンスされていない気がした。

そこでpywebhdfsを使うことにした。しかしこのライブラリではHAとFederationがサポートされていなかった。

HAとFederationについて

HA(High Availability)とはHadoopのNameNodeが単一障害点だった欠点を解消するために、ActiveとStandbyという2つ以上のNameNodeを起動しておき、ActiveなNameNodeが落ちたらStandbyだったNameNodeが自動でActiveに切り替わるという仕組みで、実際の業務でHadoopを使うなら必須な機能。

Federationは複数のNameNodeがメタ情報(ディレクトリ構造とか)を分担してメモリに保持する仕組み。貧弱なメモリのNameNodeで大規模なクラスタを管理しようとすると全てのメタ情報が載り切らない。この時/data/以下はNameNode1で、/user/以下はNameNode2でそれ以外はNameNode3でという感じで分割できる仕組み。

どちらも自分たちのプロジェクトでは使っている。

現状pywebhdfsは1つのNameNodeのホスト名を渡すので、Activeが落ちた時に、APIはずっとエラーを返すようになってしまい、それを外側で検知して切り替えたりしないといけない。

HAとFederationのサポートの実装

いろいろ考えたが、pywebhdfsのIssueにもHAのサポートが欲しいという声が上がっていたので、PRを投げることにした。

しばらくの土日はいろんなパターンでHA/Federationをサポートする仕組みを実装した。

最終的には、パスにマッチする正規表現とそれに該当するNameNodeのリスト(HAならActiveとStandby)を順序付きの辞書でAPIコンストラクタに渡すことにした。

順序付きの辞書にしたのは、それ以外のパス(.*)とは最後にマッチさせたいからだ。

実装にあたって気をつけたのは、HA/Federationを利用していないユーザーには今までと同じインターフェースを保つこと。

そしてPRを投げた。

support federation and HA by cloverrose · Pull Request #22 · pywebhdfs/pywebhdfs · GitHub

もらったレビューを反映して、今朝マージされた :)

1ヶ月待っていたので感慨深かった。

f:id:cloverrose:20150718113204p:plain

余談

HDFSのシェルインタプリタは自作のapi.pyではなく、自分のパッチが当たったpywebhdfsを使って実装し直して現在も気に入って使っています。

Pythonインタプリタを作るときにはreadlineをラップしたcmdという便利なものがあります。

TABで補完とかが簡単に実装できるし、入力読み取り→実行のループも勝手にやってくれます。初めて知ったけど、今後また何か作るときに使っていきたい。