HUAHUA

Daily-Note 10-04

OVERVIEW

  1. Spark 并发模型,Spark 以 Executor 进程为单元,在 task 上实现多线程并发模型。
  2. 线性模型,传统机器学习模型。
  3. 字典树,一种前缀树,可用于路由匹配、语法补全等场景。
  4. 拉格朗日乘子法,数学最优化问题中求解多元函数在约束条件下局部极值的方法。

Spark 并发模型

RDD

RDD 是 Resilient Distrubuted Dataset,弹性分布式数据集,是 Spark 数据的基本组织方式。

// -- RDD.scala -- //
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

def compute(split: Partition, context: TaskContext): Iterator[T]

调用 iterator() 方法将触发 RDD 的计算方法,getOrComputecomputeOrReadCheckpoint 方法实现了计算结果缓存的机制,其最终的计算方法落在 compute 方法,该方法由 RDD 的子类实现具体的计算方法。

TreeNode

How to explain TreeNode type restriction and self-type in Spark’s TreeNode?

TreeNode 是 Spark 组织计算流程的数据结构,其子类包括 Expression 和 QueryPlan,QueryPlan 实现有 LogicalPlan 和 SparkPlan,前者是逻辑图,后者是实际的物理执行图。

// -- SparkPlan.scala -- //
final def execute(): RDD[InternalRow] = executeQuery {
  if (isCanonicalizedPlan) {
    throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
  }
  doExecute()
}

protected def doExecute(): RDD[InternalRow]

execute() 方法用于产生新的 RDD,其具体实现由各子类实现 doExecute() 方法。FileSourceScanExecdoExecute() 方法实现如何从原始的数据源读取和转换数据;HashAggregateExecdoExecute() 方法实现如何将来自上一个 RDD 数据转换到下一个数据的计算过程。

doExecute() 方法一般通过调用上一个 RDD 的 mapPartitions() 系方法,传入转换函数 f: Iterator[T] => Iterator[U],从而构造新的 RDD,而 f 将在 RDD 的 compute() 方法中调用。

这样最开始只需要构造一个 RDD,调用其 Iterator() 方法,Iterator() 方法调用具体的 compute() 方法,实现 RDD 的转换,最后生成目标的 RDD。

Shuffle

Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter

Shuffle 的作用是重排 inputRDD 中 partions 的 record,将其映射到 outputRDD 指定的 partions 中。

SQL 提交流程

sql() 方法会调用 parsePlan() 方法创建一个 LogicalPlan,返回有一个由 plan 构成的 Dataset。

// -- main.scala -- //
sc.sql("select avg(age), max(age) from people").show()

// -- SparkSession.scala -- //
def sql(sqlText: String): DataFrame = withActive {
  val tracker = new QueryPlanningTracker
  val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
    sessionState.sqlParser.parsePlan(sqlText)
  }
  Dataset.ofRows(self, plan, tracker)
}

sql() 方法中不会直接执行 plan 进行计算,而是在 show() 方法调用,在实际需要结果的时候发起计算,其中 show() 方法最终会调用 executeCollect() 方法收集 plan 的执行结果。

execute() 方法主要用于递归地构建 RDD 的转换过程,描述 RDD 转换的计算方法。通过 sc.runJob() 来实际的提交运算任务。

// -- SparkPlan.scala -- //
def executeCollect(): Array[InternalRow] = {
  val byteArrayRdd = getByteArrayRdd()

  val results = ArrayBuffer[InternalRow]()
  byteArrayRdd.collect().foreach { countAndBytes =>
    decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
  }
  results.toArray
}

private def getByteArrayRdd(
  n: Int = -1, takeFromEnd: Boolean = false): RDD[(Long, Array[Byte])] = {
  execute() // execute() 方法会递归地构建 RDD,描述计算方法
  .mapPartitionsInternal { iter => 
    var count = 0
    // ...
    
def collect(): Array[T] = withScope {
  // 在这里调用 sc.runJob 提交任务
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

sc.runJob() 方法最终会调用 dagScheduler.runJob() 方法,而 dagScheduler.runJob() 方法将调用 submitJob() 方法将任务包装为 JobSubmitted (case class) 推送到 eventProcessLoop 中,由 DAGSchedulerEventProcessLoop 实现中的 doOnReceive() 方法进行处理。

def runJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): Unit

doOnReceive() 方法中,对 JobSubmitted match 调用 handleJobSubmitted() 方法进行处理。该方法主要构建了 finalStage 后调用 submitStage() 方法提交 stage。

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties): Unit = {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
   // ...
  }
  // Job submitted, clear internal data.
  barrierJobIdToNumTasksCheckFailures.remove(jobId)

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
 // ...
  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
      Utils.cloneProperties(properties)))
  submitStage(finalStage)
}

submiteStage() 方法中,getMissingParentStages() 方法依靠 denpendencies 分析当前 stage 依赖的 stage,递归地提交依赖的 stage,submitMissingTasks 完成所有的 stage 的 job 提交。

private def submitStage(stage: Stage): Unit = {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug(s"submitStage($stage (name=${stage.name};" +
      s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

submitMissingTasks() 方法首先完成任务执行的预处理,包括任务的执行地址等,然后对任务进行序列化,根据 shuffle 和 result 阶段生成对应的 task,最后调用 taskScheduler.submitTask() 提交对应的任务,当当前阶段的任务完成后,标记对应任务完成,并继续提交等待队列的任务。

private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  // ...
  
  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  // the serialized copy of the RDD and for each task we will deserialize it, which means each
  // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  // might modify state of objects referenced in their closures. This is necessary in Hadoop
  // where the JobConf/Configuration object is not thread-safe.
  var taskBinary: Broadcast[Array[Byte]] = null //  序列化 task
  var partitions: Array[Partition] = null
  try {
    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    // For ResultTask, serialize and broadcast (rdd, func).
    var taskBinaryBytes: Array[Byte] = null
    // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
    // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
    // consistent view of both variables.
    RDDCheckpointData.synchronized {
      taskBinaryBytes = stage match {
        case stage: ShuffleMapStage =>
          JavaUtils.bufferToArray(
            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
        case stage: ResultStage =>
          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
      }

      partitions = stage.rdd.partitions
    }

    if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
      logWarning(s"Broadcasting large task binary with size " +
        s"${Utils.bytesToString(taskBinaryBytes.length)}")
    }
    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    // ...
  }

  val tasks: Seq[Task[_]] = try {
    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
    stage match {
      case stage: ShuffleMapStage =>
        stage.pendingPartitions.clear()
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = partitions(id)
          stage.pendingPartitions += id
          
          // 生成 ShuffleMapTask
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
            Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
        }

      case stage: ResultStage =>
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = partitions(p)
          val locs = taskIdToLocations(id)
          
          // 生成 ResultTask
          new ResultTask(stage.id, stage.latestInfo.attemptNumber,
            taskBinary, part, locs, id, properties, serializedTaskMetrics,
            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
            stage.rdd.isBarrier())
        }
    }
  } catch {
    // ...
  }

  if (tasks.nonEmpty) {
    logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
      s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
    // 提交任务
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
      stage.resourceProfileId))
  } else {
    // Because we posted SparkListenerStageSubmitted earlier, we should mark
    // the stage as completed here in case there are no tasks to run
    markStageAsFinished(stage, None)

    stage match {
      case stage: ShuffleMapStage =>
        logDebug(s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})")
        markMapStageJobsAsFinished(stage)
      case stage : ResultStage =>
        logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
    }
    submitWaitingChildStages(stage)
  }
}

submitTasks() 方法将 taskSet 置于 scedulableBuilder 的管理中,然后调用 backend.reviveOffers() 发送事件。在本地 backend 的实现是 LocalSchedulerBackend,通常分布式部署下 backend 的实现是 CoarseGrainedSchedulerBackend (意为粗粒度的调度器后端)

override def submitTasks(taskSet: TaskSet): Unit = {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
    + "resource profile " + taskSet.resourceProfileId)
  this.synchronized {
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

    // Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one.
    // This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2
    // TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10
    // and it completes. TSM2 finishes tasks for partition 1-9, and thinks he is still active
    // because partition 10 is not completed yet. However, DAGScheduler gets task completion
    // events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage
    // and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a
    // TSM3 for it. As a stage can't have more than one active task set managers, we must mark
    // TSM2 as zombie (it actually is).
    stageTaskSets.foreach { case (_, ts) =>
      ts.isZombie = true
    }
    stageTaskSets(taskSet.stageAttemptId) = manager
    // 添加任务到任务管理器中
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run(): Unit = {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  // 发送事件
  backend.reviveOffers()
}

LocalSchedulerBackend 中,本地 RpcEndpoint.recive() 方法收到 reciveOffers 事件后调用 reciveOffers() 方法处理任务,调用 executor.launchTask() 实现并发执行任务,后者调用将任务放入线程池中执行。

可以看到 spark 中 task 得并发模型在 Thread 级别实现,据了解 Hadoop 的 Task 是在 Process 级别实现。

// -- LocalSchedulerBackend.scala -- //
def reviveOffers(): Unit = {
  // local mode doesn't support extra resources like GPUs right now
  val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
    Some(rpcEnv.address.hostPort)))
  for (task <- scheduler.resourceOffers(offers, true).flatten) {
    freeCores -= scheduler.CPUS_PER_TASK
    executor.launchTask(executorBackend, task)
  }
}

// -- Executor.scala -- //
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
  val tr = new TaskRunner(context, taskDescription, plugins)
  runningTasks.put(taskDescription.taskId, tr)
  threadPool.execute(tr)
  if (decommissioned) {
    log.error(s"Launching a task while in decommissioned state.")
  }
}

其中 TaskRunner 是对 task 的 runnable 封装,其中的 run() 方法调用 task.run() 方法,而 task.run() 方法分别调用其各自的具体实现,分为 ShuffleMapTaskResultTask.

ShuffleMapTask 中,调用 dep.shuffleWriterProcessor.write() 方法将 shuffle 阶段的中间结果写出到文件中,目前 write() 方法中 getWriter 实现只有 SortShuffleManager,其调用 SortShuffleManager.write() 方法将 rdd.iterator(partition, context) 的结果写出文件。

ResultTask 中,调用 func(context, rdd.iterator(partition, context)) 写出结果,其中 func 由任务发起方在 sc.runJob() 中传入,在本例中为 sparkPlan 中调用的 val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray),其将 RDD 的最终结果输出成 Array。

// -- ShuffleMapTask.scala -- //
override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTimeNs = System.nanoTime()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  val rdd = rddAndDep._1
  val dep = rddAndDep._2
  // While we use the old shuffle fetch protocol, we use partitionId as mapId in the
  // ShuffleBlockId construction.
  val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
    partitionId
  } else context.taskAttemptId()
  dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
}

// -- ResultTask.scala -- //
override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTimeNs = System.nanoTime()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  func(context, rdd.iterator(partition, context))
}

至此,spark sql 的提交流程结束。

总结

线性模型

CS229 Syllabus

§1 前置知识

§1.1 极大似然法

(Maximum Likelihood Estimation,MLE)

似然函数 Wiki

Likelihood function Wiki

极大似然估计 Wiki

§1.2 矩阵导数

(Matrix derivatives)

§1.3 指数族分布

(The exponential family)

指数族分布 百度百科

Exponential family Wiki

§1.4 多项分布

(Multinomial Distribution)

多项分布 百度百科

多项分布的概率分布函数为:

$$ \mathrm{P}\left(\mathrm{X}_{1}=\mathrm{k}_{1},\mathrm{X}_{2}=\mathrm{k}_{2},\cdots,\mathrm{X}_{\mathrm{n}}=\mathrm{k}_{\mathrm{n}}\right)=\dfrac{\mathrm{n}!}{\mathrm{k}_{1}!\mathrm{k}_{2}!\cdots\mathrm{k}_{\mathrm{n}}!}\prod_{i=1}^{\mathrm{n}}\mathrm{p}_{i}^{\mathrm{k}_{i}}\quad,\sum_{i=1}^{\mathrm{n}}\mathrm{k}_{i}=\mathrm{n} $$

§2 线性模型

(Linear Module)

Object function:

$$ h(x) = \theta^{\mathrm{T}}x; $$

Cost function(LMS):

$$ J(\theta) = \frac{1}{2} \sum\limits_{i=1}^{n} (h_\theta(x_i) - y_i)^2 $$

方法一,通过 gradient descent 方法调整参数 $\theta$。

首先计算梯度方向,

$$ \begin{array}{rcl} \frac{\partial}{\partial\theta_j}J(\theta) &=&\frac{\partial}{\partial\theta_j}\frac{1}{2}\left(h_\theta(x)-y\right)^2\\ &=&2\cdot\frac{1}{2}\left(h_\theta(x)-y\right)\cdot\frac{\partial}{\partial\theta_j}\left(h_\theta(x)-y\right)\\ &=&\left(h_\theta(x)-y\right)\cdot\frac{\partial}{\partial\theta_j}\left(\sum_{i=0}^{d}\theta_ix_i-y\right)\\ &=&\left(h_\theta(x)-y\right)x_j \end{array} $$

根据下式迭代调整 $\theta$ 参数。

$$ \begin{align}\theta_j:=\theta_j+\alpha\left(y^{(i)}-h_\theta(x^{(i)})\right)x_j^{(i)}.\end{align} $$

方法二,通过 closed-form solution 直接求得最优参数。

首先计算损失的梯度,

$$ \begin{array}{rcl} \nabla_\theta J(\theta) &=&\nabla_\theta\frac{1}{2}(X(\theta-\vec{y})^T(X\theta-\vec{y})\\ &=&\frac{1}{2}\nabla_\theta((X\theta)^T X\theta-(X\theta)^T\vec{y}-\vec{y}^T(X\theta)+\vec{y}^T\vec{y})\\ &=&\frac{1}{2}\nabla_\theta(\theta^T(X^T X)\theta-\vec{y}^T(X\theta)-\vec{y}^T(X\theta))\\ &=&\frac{1}{2}\nabla_\theta\left(\theta^T(X^T X X)\theta-2(X^T\vec{y})^T\theta\right)\\ &=&\frac{1}{2}\left(2X^T X\theta-2X^T\vec{y}\right)\\ &=&X^T X\theta-X^T\vec{y}\\ \end{array} $$

令 $\nabla_\theta J(\theta) = 0$,得

$$ \theta = (X^TX)^{-1}X^T \overrightarrow{y} $$

概率解释,为什么选择 $J(\theta) = \frac{1}{2} \sum\limits_{i=1}^{n} (h_\theta(x_i) - y_i)^2$ 作为 cost function?

CS229学习笔记之概率解释与局部加权线性回归

§3 逻辑回归

(Logistic Regression)

根据极大似然法,逻辑回归中由 x 给出的 y 满足二项分布(Bernoulli distribution),概率分布为:

$$ p(y|x;\theta)=(h_\theta(x))^y(1-h_\theta(x))^{1-y} $$

使 $L(\theta)=p(y|x;\theta)$ 最大,

$$ \begin{array}{rcl} L(\theta) &=&p(\vec{y}\mid X;\theta)\\ &=&\prod\limits_{i=1}^np(y^{(i)}\mid x^{(i)};\theta)\\ &=&\prod\limits_{i=1}^n\left(h_\theta(x^{(i)})\right)^{y^{(i)}}\left(1-h_\theta(x^{(i)})\right)^{1-y^{(i)}} \end{array} $$

为求解方便,对 $L(\theta)$ 取对数,

$$ \begin{array}{rcl} \ell(\theta) &=&\log L(\theta)\\ &=&\sum_{i=1}^n y^{(i)}\log h(x^{(i)})+(1-y^{(i)})\log(1-h(x^{(i)})) \end{array} $$

对 $\ell(\theta)$ 求导,这里 $g(\cdot)$ 是联系函数(link function),$g(\cdot)=\frac{1}{1+\exp^{-x}}$,称之为 sigmod 函数,该函数的导数恰为 $g'(x)=g(x)(1-g(x))$。

$$ \begin{array}{rcl} \frac{\partial}{\partial\theta_j}\ell(\theta) &=&\left(y\frac{1}{g(\theta^Tx)}-(1-y)\frac{1}{1-g(\theta^Tx)}\right)\frac{\partial}{\partial\theta_j}g(\theta^Tx)\\ &=&\left(y\frac{1}{g(\theta^Tx)}-(1-y)\frac{1}{1-g(\theta^Tx)}\right)g(\theta^Tx)(1-g(\theta^Tx))\frac{\partial}{\partial\theta_j}g^Tx\\ &=&\left(y(1-g(\theta^Tx))-(1-y)g(\theta^Tx)\right)x_j\\ &=&(y-h_g(x))\:x_j \end{array} $$

根据 gradient descent rule 更新参数 $\theta$。

$$ \theta_j:=\theta_j+\alpha\left(y^{(i)}-h_\theta(x^{(i)})\right)x_j^{(i)} $$

这里形式上与线性模型相同,其中不同在于 $h(\theta)$,线性模型中为 $h(\theta)=\omega x+b$,此处为 $h(\theta)=g(\omega x+b)=\frac{1}{1+\exp^{-(\omega x+b)}}$ 。

§4 广义线性模型

(Generalized Linear Models, GLMs)

§4.1 Softmax Regression

Softmax Regression 用于多分类场景,这里我们假设多分类场景中 $y$ 满足指数族分布,即 $y|x,\theta \sim ExponentialFmaily(\theta)$,且这里的指数族分布为多项分布(Multinomial Distribution)。

$$ \begin{array}{rcl} p(y|x;\theta) &=&\phi_1^{1\{y=1\}}\phi_2^{1\{y=2\}}\phi_3^{1\{y=3\}}\cdots\phi_k^{1\{y=k\}}\\ &=&\phi_1^{1\{y=1\}}\phi_2^{1\{y=2\}}\phi_3^{1\{y=3\}}\cdots\phi_k^{1-\sum_{i=1}^{k-1}1\{y=i\}}\\ &=&\phi_1^{(T(y))_1}\phi_2^{(T(y))_2}\phi_3^{(T(y))_3}\cdots\phi_k^{1-\sum_{i=1}^{k-1}(T(y))_k}\\ &=&\exp((T(y))_1log(\phi_1)+(T(y))_2log(\phi_2)+(T(y))_3log(\phi_3)+\\ &&\cdots+(1-\sum_{i=1}^{k-1}(T(y))_k)log(\phi_k))\\ &=&\exp((T(y))_1log(\frac{\phi_1}{\phi_k})+(T(y))_2log(\frac{\phi_2}{\phi_k})+(T(y))_3log(\frac{\phi_3}{\phi_k})+\\ &&\cdots+(T(y))_klog(\phi_k))\\ &=&b(y)\exp(\eta^TT\left(y\right)-a\left(\eta\right))\\ \end{array} $$

根据指数族分布形式可知,

$$ \begin{array}{rcl} \eta &=&\begin{bmatrix} \log(\frac{\phi_1}{\phi_k})\\ \log(\frac{\phi_2}{\phi_k})\\ \vdots\\ \log(\frac{\phi_k-1}{\phi_k}) \end{bmatrix}\\ a(\eta)&=&-log(\phi_k)\\ b(y)&=&1 \end{array} $$

这里假设 $T(y)=y; h(\theta)=E[y|x]$,则有,

$$ \begin{array}{rcl} h_{\theta}(x) &=&\text{E}[T(y)|x;\theta]\\ &=&E\left[\begin{array}{c|c} \begin{matrix} 1\{y=2\}\\ 1\{y=2\}\\ \vdots\\ 1\{y=k-1\} \end{matrix}& x;\theta \end{array}\right]\\ &=&\left[ \begin{matrix} \phi_1\\ \phi_2\\ \vdots\\ \phi_{k-1} \end{matrix} \right]\\ &=&\left[ \begin{matrix} \frac{\exp(\theta_1^Tx)}{\sum_{j=1}^{k}\exp(\theta_j^Tx)}\\ \frac{\exp(\theta_2^Tx)}{\sum_{j=1}^{k}\exp(\theta_j^Tx)}\\ \vdots\\ \frac{\exp(\theta_{k-1}^Tx)}{\sum_{j=1}^{k}\exp(\theta_j^Tx)}\\ \end{matrix} \right] \end{array} $$

根据多项分布的概率分布函数,将 $h(\theta)$ 代入概率分布函数,根据极大似然求对应的 $\theta$。

$$ \begin{array}{rcl}\ell(\theta) &=&\sum\limits_{i=1}^n\log p(y^{(i)}|x^{(i)};\theta)\\ &=&\sum\limits_{i=1}^n\log\prod\limits_{l=1}^k\left(\dfrac{e^{\theta_l^Tx^{(i)}}}{\sum\limits_{j=1}^ke^{\theta_j^Tx^{(i)}}}\right)^{1\{y^{(i)}=l\}}\end{array} $$

可以通过 gradient ascent 或 Newton’s method 进行求解。

字典树

字典树(Trie) OI-Wiki

拉格朗日乘子法

拉格朗日乘数 wiki

同济大学 高等数学下

二元函数极值

拉格朗日乘子法

目标函数

$$ z=f(x,y)\\ $$

在条件

$$ \varphi(x,y)=0 $$

下取得极值的问题,等价于求下式极值。

$$ \mathcal{L}(x,y,\lambda)=f(x,y)-\lambda\cdot\varphi(x,y) $$

推导

证明

应用