Yanyg - Software Engineer

协程技术

目录

协程是非抢占式的多任务调度, 用于简化异步代码的实现, 可用同步方式写异步代码, 其代价是协程切换的成本, 以及协程函数对阻塞API的使用限制.

协程分为有栈协程和无栈协程. 有栈协程可以在任意函数调用里挂起. 无栈协程通过语言提供的特定关键词在特定位置挂起. Platform实现有栈协程.

协程适合用于逻辑复杂的控制链路. 例如, 分区加载.

1 从示例说起

考虑KVPartition的加载过程:

  1. KVServer收到Partition加载请求;
  2. KVServer获取Partition保护锁, 防止对Partition的并发操作;
  3. KVServer List/Stat Partition目录, 获取基本信息;
  4. KVServer加载Partition Meta数据;

其中每一步操作都涉及IO, 但线程还要服务其他Partition, 因此基于异步IO实现.

这类操作的典型实现有三种: 异步回调, 状态机加异步回调, 协程.

异步回调实现异步逐层嵌套, 逻辑复杂, 代码易出错. 如下是部分实现伪码:

void KVPartition::Load(ClosureRInt *done)
{
  // do some sync non-block task, then call acuqireLock to acuqure Lock
  ClosureRInt *loadDone = NewClosure(this, KVPartition::onLoadDone, done);
  acquireLock(loadDone);
}

void KVPartition::acquireLock(ClosureRInt *done)
{
  // PR/Cas lock, like as zk, chubby
  AcquireLockContext *ctx = new AcquireLockContext(done);
  Closure *acquireLockDone =
    NewClosure(this, &KVPartition::onAcuqireLockDone, ctx);

  ctx->ctrl.SetTimeout(5000 * 1000);
  mClient->Read(&ctx->ctrl, &ctx->content, acquireLockDone);
}

void KVPartition::onAcquireLockDone(AcquireLockContext *ctx)
{
  ScopedContext _(ctx); // auto free.

  if (!ctx->ctrl.IsOk()) {
    ctx->done->SetResult(ctx->ctrl.GetResult());
    ctx->done->Run();
    return;
  }

  // acquire lock success, list task.
  ListDirContext *ctx = new ListDirContext(ctx->done);
  Closure *listDone = NewClosure(this, &KVPartition::onListDirDone, ctx->done);
  ctx->ctrl.SetTimeout(2000 * 1000);
  mClient->ReadDir(&ctx->ctrl, &ctx->entries, listDone);
}

状态机加异步回调方式相对较好:

void KVPartition::Load(ClosureRInt *done)
{
  if (LOAD_INVALID != mState) {
    done->SetResult(EBUSY);
    return;
  }

  mLoadDone = done;
  mState = LOAD_INIT;
  doLoad();
}

void KVPartition::doLoad()
{
  if (OK != mLoadResult) {
    LOG_ERROR("LoadFailed: error %d in state %s",
              mLoadReuslt, ToString(mState));
    mLoadDone->SetResult(mLoadResult);
    mLoadDone->Run();
    return;
  }

  switch (mState) {
  case LOAD_INIT:
    doAcquireLock();
    break;
  case LOAD_LIST:
    doListDir();
    break;
  case LOAD_READMETA:
    doReadMeta();
    reak;
  case LOAD_DONE:
    mLoadDone->SetResult(OK);
    mLoadDone->Run();
    break;
  }
}

void KVPartition::doAcquireLock()
{
  // PR/Cas lock, like as zk, chubby
  AcquireLockContext *ctx = new AcquireLockContext(done);
  Closure *acquireLockDone =
    NewClosure(this, &KVPartition::onAcuqireLockDone, ctx);

  ctx->ctrl.SetTimeout(5000 * 1000);
  mClient->Read(&ctx->ctrl, &ctx->content, acquireLockDone);
}

void KVPartition::onAcquireLockDone(AcquireLockContext *ctx)
{
  ScopedContext _(ctx); // auto free.

  mLoadResult = ctx->ctrl.GetResult();
  if (OK == mLoadResult) {
    mLoadState = nextLoadState();
  }

  doLoad();
}

协程实现是最简洁的. 同时考虑加载分区的频率低, 协程切换的成本在可接受范围内:

void KVPartition::Load(ClosureRInt *done)
{
  if (nullptr == GetCurrentCoroutine()) {
    InvokeCoroutineInCurrentThread(NewClosure(this, &KVPartition::Load, done));
    return;
  }

  ScopedDone scopedDone(done);

  int ret = doAcuqireLock();
  if (OK != ret) {
    done->SetResult(ret);
    return;
  }

  ret = doListDir();
  if (OK != ret) {
    return;
  }

  // more ...
}

2 协程实现

TODO. 太长了, 抽时间再写完整的吧.

协程包括协程调度, 协程挂起与恢复, 协程同步, 协程与线程的融合等是想.

3 协程参考