Daily-Note 10-04
OVERVIEW
- Spark 并发模型,Spark 以 Executor 进程为单元,在 task 上实现多线程并发模型。
- 线性模型,传统机器学习模型。
- 字典树,一种前缀树,可用于路由匹配、语法补全等场景。
- 拉格朗日乘子法,数学最优化问题中求解多元函数在约束条件下局部极值的方法。
Spark 并发模型
RDD
RDD 是 Resilient Distrubuted Dataset,弹性分布式数据集,是 Spark 数据的基本组织方式。
// -- RDD.scala -- //
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
def compute(split: Partition, context: TaskContext): Iterator[T]
调用 iterator() 方法将触发 RDD 的计算方法,getOrCompute 和 computeOrReadCheckpoint 方法实现了计算结果缓存的机制,其最终的计算方法落在 compute 方法,该方法由 RDD 的子类实现具体的计算方法。
TreeNode
How to explain TreeNode type restriction and self-type in Spark’s TreeNode?
TreeNode 是 Spark 组织计算流程的数据结构,其子类包括 Expression 和 QueryPlan,QueryPlan 实现有 LogicalPlan 和 SparkPlan,前者是逻辑图,后者是实际的物理执行图。
// -- SparkPlan.scala -- //
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
protected def doExecute(): RDD[InternalRow]
execute() 方法用于产生新的 RDD,其具体实现由各子类实现 doExecute() 方法。FileSourceScanExec 等 doExecute() 方法实现如何从原始的数据源读取和转换数据;HashAggregateExec 等 doExecute() 方法实现如何将来自上一个 RDD 数据转换到下一个数据的计算过程。
doExecute() 方法一般通过调用上一个 RDD 的 mapPartitions() 系方法,传入转换函数 f: Iterator[T] => Iterator[U],从而构造新的 RDD,而 f 将在 RDD 的 compute() 方法中调用。
这样最开始只需要构造一个 RDD,调用其 Iterator() 方法,Iterator() 方法调用具体的 compute() 方法,实现 RDD 的转换,最后生成目标的 RDD。
Shuffle
Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter
Shuffle 的作用是重排 inputRDD 中 partions 的 record,将其映射到 outputRDD 指定的 partions 中。
SQL 提交流程
sql() 方法会调用 parsePlan() 方法创建一个 LogicalPlan,返回有一个由 plan 构成的 Dataset。
// -- main.scala -- //
sc.sql("select avg(age), max(age) from people").show()
// -- SparkSession.scala -- //
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}
在 sql() 方法中不会直接执行 plan 进行计算,而是在 show() 方法调用,在实际需要结果的时候发起计算,其中 show() 方法最终会调用 executeCollect() 方法收集 plan 的执行结果。
execute() 方法主要用于递归地构建 RDD 的转换过程,描述 RDD 转换的计算方法。通过 sc.runJob() 来实际的提交运算任务。
// -- SparkPlan.scala -- //
def executeCollect(): Array[InternalRow] = {
val byteArrayRdd = getByteArrayRdd()
val results = ArrayBuffer[InternalRow]()
byteArrayRdd.collect().foreach { countAndBytes =>
decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
}
results.toArray
}
private def getByteArrayRdd(
n: Int = -1, takeFromEnd: Boolean = false): RDD[(Long, Array[Byte])] = {
execute() // execute() 方法会递归地构建 RDD,描述计算方法
.mapPartitionsInternal { iter =>
var count = 0
// ...
def collect(): Array[T] = withScope {
// 在这里调用 sc.runJob 提交任务
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
sc.runJob() 方法最终会调用 dagScheduler.runJob() 方法,而 dagScheduler.runJob() 方法将调用 submitJob() 方法将任务包装为 JobSubmitted (case class) 推送到 eventProcessLoop 中,由 DAGSchedulerEventProcessLoop 实现中的 doOnReceive() 方法进行处理。
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit
在 doOnReceive() 方法中,对 JobSubmitted match 调用 handleJobSubmitted() 方法进行处理。该方法主要构建了 finalStage 后调用 submitStage() 方法提交 stage。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
// ...
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
// ...
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
submitStage(finalStage)
}
在 submiteStage() 方法中,getMissingParentStages() 方法依靠 denpendencies 分析当前 stage 依赖的 stage,递归地提交依赖的 stage,submitMissingTasks 完成所有的 stage 的 job 提交。
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
submitMissingTasks() 方法首先完成任务执行的预处理,包括任务的执行地址等,然后对任务进行序列化,根据 shuffle 和 result 阶段生成对应的 task,最后调用 taskScheduler.submitTask() 提交对应的任务,当当前阶段的任务完成后,标记对应任务完成,并继续提交等待队列的任务。
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
// ...
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null // 序列化 task
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// ...
}
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
// 生成 ShuffleMapTask
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
// 生成 ResultTask
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
// ...
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
// 提交任务
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}
submitTasks() 方法将 taskSet 置于 scedulableBuilder 的管理中,然后调用 backend.reviveOffers() 发送事件。在本地 backend 的实现是 LocalSchedulerBackend,通常分布式部署下 backend 的实现是 CoarseGrainedSchedulerBackend (意为粗粒度的调度器后端)
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
+ "resource profile " + taskSet.resourceProfileId)
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
// Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one.
// This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2
// TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10
// and it completes. TSM2 finishes tasks for partition 1-9, and thinks he is still active
// because partition 10 is not completed yet. However, DAGScheduler gets task completion
// events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage
// and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a
// TSM3 for it. As a stage can't have more than one active task set managers, we must mark
// TSM2 as zombie (it actually is).
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
// 添加任务到任务管理器中
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run(): Unit = {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
// 发送事件
backend.reviveOffers()
}
在 LocalSchedulerBackend 中,本地 RpcEndpoint.recive() 方法收到 reciveOffers 事件后调用 reciveOffers() 方法处理任务,调用 executor.launchTask() 实现并发执行任务,后者调用将任务放入线程池中执行。
可以看到 spark 中 task 得并发模型在 Thread 级别实现,据了解 Hadoop 的 Task 是在 Process 级别实现。
// -- LocalSchedulerBackend.scala -- //
def reviveOffers(): Unit = {
// local mode doesn't support extra resources like GPUs right now
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
Some(rpcEnv.address.hostPort)))
for (task <- scheduler.resourceOffers(offers, true).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task)
}
}
// -- Executor.scala -- //
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription, plugins)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}
其中 TaskRunner 是对 task 的 runnable 封装,其中的 run() 方法调用 task.run() 方法,而 task.run() 方法分别调用其各自的具体实现,分为 ShuffleMapTask 和 ResultTask.
在 ShuffleMapTask 中,调用 dep.shuffleWriterProcessor.write() 方法将 shuffle 阶段的中间结果写出到文件中,目前 write() 方法中 getWriter 实现只有 SortShuffleManager,其调用 SortShuffleManager.write() 方法将 rdd.iterator(partition, context) 的结果写出文件。
在 ResultTask 中,调用 func(context, rdd.iterator(partition, context)) 写出结果,其中 func 由任务发起方在 sc.runJob() 中传入,在本例中为 sparkPlan 中调用的 val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray),其将 RDD 的最终结果输出成 Array。
// -- ShuffleMapTask.scala -- //
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
val rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
partitionId
} else context.taskAttemptId()
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
}
// -- ResultTask.scala -- //
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
至此,spark sql 的提交流程结束。
总结,
线性模型
§1 前置知识
§1.1 极大似然法
(Maximum Likelihood Estimation,MLE)
§1.2 矩阵导数
(Matrix derivatives)
§1.3 指数族分布
(The exponential family)
§1.4 多项分布
(Multinomial Distribution)
多项分布的概率分布函数为:
$$ \mathrm{P}\left(\mathrm{X}_{1}=\mathrm{k}_{1},\mathrm{X}_{2}=\mathrm{k}_{2},\cdots,\mathrm{X}_{\mathrm{n}}=\mathrm{k}_{\mathrm{n}}\right)=\dfrac{\mathrm{n}!}{\mathrm{k}_{1}!\mathrm{k}_{2}!\cdots\mathrm{k}_{\mathrm{n}}!}\prod_{i=1}^{\mathrm{n}}\mathrm{p}_{i}^{\mathrm{k}_{i}}\quad,\sum_{i=1}^{\mathrm{n}}\mathrm{k}_{i}=\mathrm{n} $$§2 线性模型
(Linear Module)
Object function:
$$ h(x) = \theta^{\mathrm{T}}x; $$Cost function(LMS):
$$ J(\theta) = \frac{1}{2} \sum\limits_{i=1}^{n} (h_\theta(x_i) - y_i)^2 $$方法一,通过 gradient descent 方法调整参数 $\theta$。
首先计算梯度方向,
$$ \begin{array}{rcl} \frac{\partial}{\partial\theta_j}J(\theta) &=&\frac{\partial}{\partial\theta_j}\frac{1}{2}\left(h_\theta(x)-y\right)^2\\ &=&2\cdot\frac{1}{2}\left(h_\theta(x)-y\right)\cdot\frac{\partial}{\partial\theta_j}\left(h_\theta(x)-y\right)\\ &=&\left(h_\theta(x)-y\right)\cdot\frac{\partial}{\partial\theta_j}\left(\sum_{i=0}^{d}\theta_ix_i-y\right)\\ &=&\left(h_\theta(x)-y\right)x_j \end{array} $$根据下式迭代调整 $\theta$ 参数。
$$ \begin{align}\theta_j:=\theta_j+\alpha\left(y^{(i)}-h_\theta(x^{(i)})\right)x_j^{(i)}.\end{align} $$方法二,通过 closed-form solution 直接求得最优参数。
首先计算损失的梯度,
$$ \begin{array}{rcl} \nabla_\theta J(\theta) &=&\nabla_\theta\frac{1}{2}(X(\theta-\vec{y})^T(X\theta-\vec{y})\\ &=&\frac{1}{2}\nabla_\theta((X\theta)^T X\theta-(X\theta)^T\vec{y}-\vec{y}^T(X\theta)+\vec{y}^T\vec{y})\\ &=&\frac{1}{2}\nabla_\theta(\theta^T(X^T X)\theta-\vec{y}^T(X\theta)-\vec{y}^T(X\theta))\\ &=&\frac{1}{2}\nabla_\theta\left(\theta^T(X^T X X)\theta-2(X^T\vec{y})^T\theta\right)\\ &=&\frac{1}{2}\left(2X^T X\theta-2X^T\vec{y}\right)\\ &=&X^T X\theta-X^T\vec{y}\\ \end{array} $$令 $\nabla_\theta J(\theta) = 0$,得
$$ \theta = (X^TX)^{-1}X^T \overrightarrow{y} $$概率解释,为什么选择 $J(\theta) = \frac{1}{2} \sum\limits_{i=1}^{n} (h_\theta(x_i) - y_i)^2$ 作为 cost function?
§3 逻辑回归
(Logistic Regression)
根据极大似然法,逻辑回归中由 x 给出的 y 满足二项分布(Bernoulli distribution),概率分布为:
$$ p(y|x;\theta)=(h_\theta(x))^y(1-h_\theta(x))^{1-y} $$使 $L(\theta)=p(y|x;\theta)$ 最大,
$$ \begin{array}{rcl} L(\theta) &=&p(\vec{y}\mid X;\theta)\\ &=&\prod\limits_{i=1}^np(y^{(i)}\mid x^{(i)};\theta)\\ &=&\prod\limits_{i=1}^n\left(h_\theta(x^{(i)})\right)^{y^{(i)}}\left(1-h_\theta(x^{(i)})\right)^{1-y^{(i)}} \end{array} $$为求解方便,对 $L(\theta)$ 取对数,
$$ \begin{array}{rcl} \ell(\theta) &=&\log L(\theta)\\ &=&\sum_{i=1}^n y^{(i)}\log h(x^{(i)})+(1-y^{(i)})\log(1-h(x^{(i)})) \end{array} $$对 $\ell(\theta)$ 求导,这里 $g(\cdot)$ 是联系函数(link function),$g(\cdot)=\frac{1}{1+\exp^{-x}}$,称之为 sigmod 函数,该函数的导数恰为 $g'(x)=g(x)(1-g(x))$。
$$ \begin{array}{rcl} \frac{\partial}{\partial\theta_j}\ell(\theta) &=&\left(y\frac{1}{g(\theta^Tx)}-(1-y)\frac{1}{1-g(\theta^Tx)}\right)\frac{\partial}{\partial\theta_j}g(\theta^Tx)\\ &=&\left(y\frac{1}{g(\theta^Tx)}-(1-y)\frac{1}{1-g(\theta^Tx)}\right)g(\theta^Tx)(1-g(\theta^Tx))\frac{\partial}{\partial\theta_j}g^Tx\\ &=&\left(y(1-g(\theta^Tx))-(1-y)g(\theta^Tx)\right)x_j\\ &=&(y-h_g(x))\:x_j \end{array} $$根据 gradient descent rule 更新参数 $\theta$。
$$ \theta_j:=\theta_j+\alpha\left(y^{(i)}-h_\theta(x^{(i)})\right)x_j^{(i)} $$这里形式上与线性模型相同,其中不同在于 $h(\theta)$,线性模型中为 $h(\theta)=\omega x+b$,此处为 $h(\theta)=g(\omega x+b)=\frac{1}{1+\exp^{-(\omega x+b)}}$ 。
§4 广义线性模型
(Generalized Linear Models, GLMs)
§4.1 Softmax Regression
Softmax Regression 用于多分类场景,这里我们假设多分类场景中 $y$ 满足指数族分布,即 $y|x,\theta \sim ExponentialFmaily(\theta)$,且这里的指数族分布为多项分布(Multinomial Distribution)。
$$ \begin{array}{rcl} p(y|x;\theta) &=&\phi_1^{1\{y=1\}}\phi_2^{1\{y=2\}}\phi_3^{1\{y=3\}}\cdots\phi_k^{1\{y=k\}}\\ &=&\phi_1^{1\{y=1\}}\phi_2^{1\{y=2\}}\phi_3^{1\{y=3\}}\cdots\phi_k^{1-\sum_{i=1}^{k-1}1\{y=i\}}\\ &=&\phi_1^{(T(y))_1}\phi_2^{(T(y))_2}\phi_3^{(T(y))_3}\cdots\phi_k^{1-\sum_{i=1}^{k-1}(T(y))_k}\\ &=&\exp((T(y))_1log(\phi_1)+(T(y))_2log(\phi_2)+(T(y))_3log(\phi_3)+\\ &&\cdots+(1-\sum_{i=1}^{k-1}(T(y))_k)log(\phi_k))\\ &=&\exp((T(y))_1log(\frac{\phi_1}{\phi_k})+(T(y))_2log(\frac{\phi_2}{\phi_k})+(T(y))_3log(\frac{\phi_3}{\phi_k})+\\ &&\cdots+(T(y))_klog(\phi_k))\\ &=&b(y)\exp(\eta^TT\left(y\right)-a\left(\eta\right))\\ \end{array} $$根据指数族分布形式可知,
$$ \begin{array}{rcl} \eta &=&\begin{bmatrix} \log(\frac{\phi_1}{\phi_k})\\ \log(\frac{\phi_2}{\phi_k})\\ \vdots\\ \log(\frac{\phi_k-1}{\phi_k}) \end{bmatrix}\\ a(\eta)&=&-log(\phi_k)\\ b(y)&=&1 \end{array} $$这里假设 $T(y)=y; h(\theta)=E[y|x]$,则有,
$$ \begin{array}{rcl} h_{\theta}(x) &=&\text{E}[T(y)|x;\theta]\\ &=&E\left[\begin{array}{c|c} \begin{matrix} 1\{y=2\}\\ 1\{y=2\}\\ \vdots\\ 1\{y=k-1\} \end{matrix}& x;\theta \end{array}\right]\\ &=&\left[ \begin{matrix} \phi_1\\ \phi_2\\ \vdots\\ \phi_{k-1} \end{matrix} \right]\\ &=&\left[ \begin{matrix} \frac{\exp(\theta_1^Tx)}{\sum_{j=1}^{k}\exp(\theta_j^Tx)}\\ \frac{\exp(\theta_2^Tx)}{\sum_{j=1}^{k}\exp(\theta_j^Tx)}\\ \vdots\\ \frac{\exp(\theta_{k-1}^Tx)}{\sum_{j=1}^{k}\exp(\theta_j^Tx)}\\ \end{matrix} \right] \end{array} $$根据多项分布的概率分布函数,将 $h(\theta)$ 代入概率分布函数,根据极大似然求对应的 $\theta$。
$$ \begin{array}{rcl}\ell(\theta) &=&\sum\limits_{i=1}^n\log p(y^{(i)}|x^{(i)};\theta)\\ &=&\sum\limits_{i=1}^n\log\prod\limits_{l=1}^k\left(\dfrac{e^{\theta_l^Tx^{(i)}}}{\sum\limits_{j=1}^ke^{\theta_j^Tx^{(i)}}}\right)^{1\{y^{(i)}=l\}}\end{array} $$可以通过 gradient ascent 或 Newton’s method 进行求解。
字典树
拉格朗日乘子法
同济大学 高等数学下
二元函数极值,
拉格朗日乘子法,
目标函数
$$ z=f(x,y)\\ $$在条件
$$ \varphi(x,y)=0 $$下取得极值的问题,等价于求下式极值。
$$ \mathcal{L}(x,y,\lambda)=f(x,y)-\lambda\cdot\varphi(x,y) $$推导,
证明,
应用,