Open vSwitchのソースコードを読む(5) ovs-vswitchd の main loop

2013-05-17 by Daisuke Kotani

前回はovs-vswitchdのmain関数のloopの前まで見たので、今回はloopの中を見ていきます。

main loopの中身

existing という変数でループを抜けるかどうか判定しています。existingの変数はunixctlでexitコマンドが送られてきたときにtrueにセットされます。 あとは初期化していたいろんなものを順次走らせています。

exiting = false;
while (!exiting) {
    worker_run();
    if (signal_poll(sighup)) {
        vlog_reopen_log_file();
    }
    memory_run();
    if (memory_should_report()) {
        struct simap usage;

        simap_init(&usage);
        bridge_get_memory_usage(&usage);
        memory_report(&usage);
        simap_destroy(&usage);
    }
    bridge_run_fast();
    bridge_run();
    bridge_run_fast();
    unixctl_server_run(unixctl);
    netdev_run();

    worker_wait();
    signal_wait(sighup);
    memory_wait();
    bridge_wait();
    unixctl_server_wait(unixctl);
    netdev_wait();
    if (exiting) {
        poll_immediate_wake();
    }
    poll_block();
}

次のように分けて順に読んでいくつもりです。

  • worker_run, worker_wait
  • signal_poll, signal_wait
  • memory_run, memory_should_report, memory_report, memory_wait
  • unixctl_server_run, unixctl_server_wait
  • poll_immediate_wake, poll_block
  • bridge_run_fast, bridge_run, bridge_wait
  • netdev_run, netdev_wait

netdev、bridge_*、ofproto、ofproto-dpif関係での長い処理は、それぞれ別の回で読んでいくつもりです。

worker_run, worker_wait

これらの関数はlib/worker.cにあります。workerはOpen vSwitchのRPCを別プロセスで処理する仕組み(たぶん)で、worker_run関数はRPCのreplyを処理するものです。replyのメッセージをbufferから読み出して、worker_reply構造体の中のcallback関数を実行しています。

void
worker_run(void)
{
    if (worker_is_running()) {
        int error;

        error = rxbuf_run(&client_rx, client_sock,
                          sizeof(struct worker_reply));
        if (!error) {
            struct worker_reply *reply = client_rx.header.data;
            reply->reply_cb(&client_rx.payload, client_rx.fds,
                            client_rx.n_fds, reply->reply_aux);
            rxbuf_clear(&client_rx);
        } else if (error != EAGAIN) {
            worker_broke();
            VLOG_ABORT("receive from worker failed (%s)",
                       ovs_retval_to_string(error));
        }
    }
}

ちなみに、worker_request構造体とworker_reply構造体は以下の定義になっています。どちらもlib/worker.cに書かれています。

/* Header for an RPC request. */
struct worker_request {
    size_t request_len;              /* Length of the payload in bytes. */
    worker_request_func *request_cb; /* Function to call in worker process. */
    worker_reply_func *reply_cb;     /* Function to call in main process. */
    void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
};

/* Header for an RPC reply. */
struct worker_reply {
    size_t reply_len;            /* Length of the payload in bytes. */
    worker_reply_func *reply_cb; /* Function to call in main process. */
    void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
};

worker_wait関数は、新しいRPC replyが入ってきたときにpoll loopを起こすものです。poll loopがどう管理されているのかはpoll loopのところで詳しく見ていきます。

signal_poll, signal_wait

signal_poll関数は、引数で指定したsignalを受け取っていたらtrue、そうでなければfalseを返すものです。

/* Returns true if signal 's' has been received since the last call to this
 * function with argument 's'. */
bool
signal_poll(struct signal *s)
{
    char buf[_POSIX_PIPE_BUF];
    ignore(read(fds[0], buf, sizeof buf));
    if (signaled[s->signr]) {
        signaled[s->signr] = 0;
        return true;
    }
    return false;
}

シグナルを受け取ったときはsignal_handlerが処理して、signald配列の該当するシグナルをtrueにしています。signal_pollはこれを見てreturn valueを決めています。

static void
signal_handler(int signr)
{
    if (signr >= 1 && signr < N_SIGNALS) {
        ignore(write(fds[1], "", 1));
        signaled[signr] = true;
    }
}

signal_wait関数は、新しいシグナルがあったらpoll loopを起こすものです。詳しくはpoll loopのところで。

memory_run, memory_should_report, memory_report, memory_wait

これらの関数は、lib/memory.cにあります。メモリの使用率を定期的に報告するものです。memory_runは報告する時間かどうかの判定、memory_should_reportは判定結果を返り値とする関数、memory_reportはunixctlを通して管理用のコマンドに使用率を送るものです。メモリ使用率のデータの収集はbridge_get_memory_usage関数内で行われています。

unixctl_server_run, unixctl_server_wait

これらの関数は、unixctlで管理用のプログラムからのコネクションを受け付け、コマンドを処理し、応答を返すものです。lib/unixctl.cにあります。 コマンドやデータはJSONで書かれていて、データを受信したあとJSONのパース、次に登録されたコマンドの実行、そのあと実行結果を返す、という流れになっています。ここら辺は深入りしたくないので、これぐらいにしておきます。

poll_immediate_wake, poll_block

これらの関数はlib/poll-loop.cにあります。

poll_blockは何か*_waitで指定したものが起きるかタイムタイムアウトするまでblockする関数です。何かイベントを待っているfile descriptorのリストはwaitersというリストにあります。 time_pollが実際に待っている関数です。この中ではpollが呼ばれています。

/* Blocks until one or more of the events registered with poll_fd_wait()
 * occurs, or until the minimum duration registered with poll_timer_wait()
 * elapses, or not at all if poll_immediate_wake() has been called. */
void
poll_block(void)
{
    static struct pollfd *pollfds;
    static size_t max_pollfds;

    struct poll_waiter *pw, *next;
    int n_waiters, n_pollfds;
    int elapsed;
    int retval;

    /* Register fatal signal events before actually doing any real work for
     * poll_block. */
    fatal_signal_wait();

    n_waiters = list_size(&waiters);
    if (max_pollfds < n_waiters) {
        max_pollfds = n_waiters;
        pollfds = xrealloc(pollfds, max_pollfds * sizeof *pollfds);
    }

    n_pollfds = 0;
    LIST_FOR_EACH (pw, node, &waiters) {
        pw->pollfd = &pollfds[n_pollfds];
        pollfds[n_pollfds].fd = pw->fd;
        pollfds[n_pollfds].events = pw->events;
        pollfds[n_pollfds].revents = 0;
        n_pollfds++;
    }

    if (timeout_when == LLONG_MIN) {
        COVERAGE_INC(poll_zero_timeout);
    }
    retval = time_poll(pollfds, n_pollfds, timeout_when, &elapsed);
    if (retval < 0) {
        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
        VLOG_ERR_RL(&rl, "poll: %s", strerror(-retval));
    } else if (!retval) {
        log_wakeup(timeout_where, NULL, elapsed);
    }

    LIST_FOR_EACH_SAFE (pw, next, node, &waiters) {
        if (pw->pollfd->revents) {
            log_wakeup(pw->where, pw->pollfd, 0);
        }
        poll_cancel(pw);
    }

    timeout_when = LLONG_MAX;
    timeout_where = NULL;

    /* Handle any pending signals before doing anything else. */
    fatal_signal_run();
}

poll_immediate_wakeはexistingがtrueのときに実行されます。poll_blockのtimeoutを0に設定していて、プログラムを終了するときにblockされることがないようにということだと思います。

次回以降の予定

後回しにしている、netdev、bridge、ofproto、ofproto-dpifを順に読んでいきます。



このエントリーをはてなブックマークに追加