八皇后问题

转自我的CSDN博客: http://blog.csdn.net/njdragonfly

8皇后问题和由他推广得到的N皇后问题来源于国际象棋的玩法,因为皇后所在的位置可以纵向、横向、两个斜向四个方向的“捕捉”,所以8皇后问题就是要求如何布置8个皇后在8*8的棋盘上而使他们互相无法“捕捉”。也就是说不存在两个皇后同行或同列,或在同一斜线上。而N皇后问题就是如何布置N个皇后在N*N棋盘里使不存在两个皇后在同行同列和同一斜线上。因为8皇后问题可以归为N皇后问题,所以下面按照N皇后问题来进行讨论。

解决N皇后问题的最好最著名的算法就是回溯法。在算法设计的基本方法中,回溯法是最一般的方法之一。在那些涉及到寻找一组解的问题或者求满足某些约束条件的最优解的问题中,有许多可以用回溯法来求解。回溯法是一个既带有系统性又带有跳跃性的的搜索算法。它在包含问题的所有解的解空间树中,按照深度优先的策略,从根结点出发搜索解空间树。算法搜索至解空间树的任一结点时,总是先判断该结点是否肯定不包含问题的解。如果肯定不包含,则跳过对以该结点为根的子树的系统搜索,逐层向其祖先结点回溯。否则,进入该子树,继续按深度优先的策略进行搜索。回溯法在用来求问题的所有解时,要回溯到根,且根结点的所有子树都已被搜索遍才结束。而回溯法在用来求问题的任一解时,只要搜索到问题的一个解就可以结束。这种以深度优先的方式系统地搜索问题的解的算法称为回溯法,它适用于解一些组合数较大的问题。

回溯法的基本思想:确定了解空间的组织结构后,回溯法就从开始结点(根结点)出发,以深度优先的方式搜索整个解空间。这个开始结点就成为一个活结点,同时也成为当前的扩展结点。在当前的扩展结点处,搜索向纵深方向移至一个新结点。这个新结点就成为一个新的活结点,并成为当前扩展结点。如果在当前的扩展结点处不能再向纵深方向移动,则当前扩展结点就成为死结点。换句话说,这个结点不再是一个活结点。此时,应往回移动(回溯)至最近的一个活结点处,并使这个活结点成为当前的扩展结点。回溯法即以这种工作方式递归地在解空间中搜索,直至找到所要求的解或解空间中已没有活结点时为止。
运用回溯法解题通常包含以下三个步骤:
(1)针对所给问题,定义问题的解空间;
(2)确定易于搜索的解空间结构;
(3)以深度优先的方式搜索解空间,并且在搜索过程中用剪枝函数避免无效搜索;

一般回溯法可用递归来实现,下面是从网上找来的一个非常典型的递归程序结构。

procedure try(i:integer);
var
begin
     if i>n then 输出结果
     else   for j:=下界 to 上界 do
          begin
           x[i]:=h[j];
           if 可行{满足限界函数和约束条件} then begin 置值;try(i+1); end;
        end;
end;

说明:
i是递归深度;
n是深度控制,即解空间树的的高度;
可行性判断有两方面的内容:不满约束条件则剪去相应子树;若限界函数越界,也剪去相应子树;两者均满足则进入下一层,直到最后的叶子输出结果。

回到N皇后问题的解决来,看看如何用回溯法解。首先找出解空间:给棋盘的行和列都编上1到N的号码,皇后也给编上1到N的号码。由于一个皇后应在不同的行上,为不失一般性,可以假定第i个皇后将放在第i行上的某列。因此N皇后问题的解空间可以用一个N元组(X1,X2,…..Xn)来表示,其中Xi是放置皇后i所在的列号。这意味着所有的解都是N元组(1,2,3,…….,N)的置换。解空间大小为N!。其次我们看约束条件:因为解空间已经给我们排除了不在同一行(因为每个皇后分别已经对应不同的行号)的约束条件。我们要判断的是不在同一列和不在同一斜线的约束。因为Xi表示皇后所在的列号,所以如果存在X(k)=X(i)那么肯定存在第k个皇后和第i个皇后同列。所以不同列的判段条件是X(k)!=X(i),1<k<i 。又因为同一斜线的特征是要么行号和列号之和不变(右高左低)要么是行号和列号只差相等(左高右低),所以同斜线的判断条件是 i+X(i)=  k+X(k) 或 i-X(i) =k-X(k),两式合并得 |X(i)-X(k)|=|i-k|  。

编程基本思路:X(j)表示一个解的空间,j表示行数,里面的值表示可以放置在的列数,抽象约束条件得到能放置一个皇后的约束条件(1)X(i)!=X(k);(2)abs(X(i)-X(k))!=abs(i-k)。应用回溯法,当可以放置皇后时就继续到下一行,不行的话就返回到第一行,重新检验要放的列数,如此反复,直到将所有解解出。

#include <iostream.h>
#include <math.h>
/*检查可不可以放置一个新的皇后*/
bool place(int k, int *X)
{
        int i;
        i=1;
        while(i<k)
        {
                  if((X[i]==X[k])||(abs(X[i]-X[k])==abs(i-k)))
                  return false;
                  i++;
         }
        return true;
}

/*求解问题的所有解*/
void Nqueens(int n,int *X)
{
       int k;
       X[1]=0;k=1;
       while(k>0)
       {
             X[k]=X[k]+1; //不断的在解空间里从小到大的试探

             while((X[k]<=n)&&(!place(k, X)))
                      X[k]=X[k]+1;                     //不符合条件的马上再取解空间的下一个值来试探。

             if(X[k]<=n)   //找到了一个位置,而且是合法的
                  if(k==n)   //是不是最后一个皇后,若是则得出一个完整解
                 {
                          for(int i=1;i<=n;i++)
                          cout<<X[i]<<" ";
                          cout<<"/n";
                   }
                  else    //若不是最后一个皇后,则给下一个皇后找位置
                 {
                          k=k+1;
                          X[k]=0;
                 }

             else      k=k-1;  //若是找了全部的列都无法放置某个皇后,则回溯到上一个k的情况,让上一个k再往下试
        }

}
/*主函数*/

void main()
{
        cout<<"|--------------N皇后问题--------------|"<<endl;
        cout<<"|-------------------------------------|"<<endl<<endl;
        int n;
        int *X;
        int i;
        while(i)
       {
                 cout<<"请输入皇后的个数:";
                 cin>>n;
                 X=new int[n];
                 cout<<"问题的解有:"<<endl;
                 Nqueens(n,X);
                 cout<<"Press<1> to run again"<<endl;
                 cout<<"Press<0> to exit"<<endl;
                 cin>>i;
         }
发表在 代码片段, 算法 | 八皇后问题已关闭评论

Paxos算法

转自我的CSDN博客: http://blog.csdn.net/njdragonfly

Paxos算法莱斯利·兰伯特(Leslie Lamport,就是 LaTeX 中的”La”,此人现在在微软研究院)于1990年提出的一种基于消息传递的一致性算法。[1] 这个算法被认为是类似算法中最有效的。

 

问题和假设

Paxos 算法解决的问题是一个分布式系统如何就某个值(决议)达成一致。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个「一致性算法」以保证每个节点看到的指令一致。一个通用的一致性算法可以应用在许多场景中,是分布式计算中的重要问题。因此从20世纪80年代起对于一致性算法的研究就没有停止过。节点通信存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。Paxos 算法就是一种基于消息传递模型的一致性算法。

为描述 Paxos 算法,Lamport 虚拟了一个叫做 Paxos 的希腊城邦,这个岛按照议会民主制的政治模式制订法律,但是没有人愿意将自己的全部时间和精力放在这种事情上。所以无论是议员,议长或者传递纸条的服务员都不能承诺别人需要时一定会出现,也无法承诺批准决议或者传递消息的时间。但是这里假设没有拜占庭将军问题(Byzantine failure,即虽然有可能一个消息被传递了两次,但是绝对不会出现错误的消息);只要等待足够的时间,消息就会被传到。另外,Paxos 岛上的议员是不会反对其他议员提出的决议的。

对应于分布式系统,议员对应于各个节点,制定的法律对应于系统的状态。各个节点需要进入一个一致的状态,例如在独立Cache对称多处理器系统中,各个处理器读内存的某个字节时,必须读到同样的一个值,否则系统就违背了一致性的要求。一致性要求对应于法律条文只能有一个版本。议员和服务员的不确定性对应于节点和消息传递通道的不可靠性。

算法

算法的提出与证明

首先将议员的角色分为 proposers,acceptors,和 learners(允许身兼数职)。proposers 提出决议,acceptors 批准决议,learners「学习」决议。划分角色后,就可以更精确的定义问题:

  1. 决议(value)只有在被 proposers 提出后才能批准(未经批准的决议称为「提案(proposal)」);
  2. 在一次 Paxos 算法的执行实例中,只批准一个 Value;
  3. learners 只能获得被批准(chosen)的 Value。

另外还需要保证 Progress。这一点以后再讨论。

作者通过不断加强上述3个约束(主要是第二个)获得了 Paxos 算法。

批准 value 的过程中,首先 proposers 将 value 发送给 acceptors,之后 acceptors 对 value 进行批准。为了满足只批准一个 value 的约束,要求经「多数派(majority)」批准的 value 成为正式的决议(称为「通过」决议)。这是因为无论是按照人数还是按照权重划分,两组「多数派」至少有一个公共的 acceptor,如果每个 acceptor 只能接受一个 value,约束2就能保证。

于是产生了一个显而易见的新约束:

P1:一个 acceptor 必须批准它接收到的第一个 value。

注意 P1 是不完备的。如果恰好一半 acceptor 批准 value A,另一半批准 value B,那么就无法形成多数派,无法批准任何一个值。

约束2并不要求只通过一个提案, 暗示可能存在多个提案。只要提案的 value 是一样的,通过多个提案不违背约束2。于是可以产生约束 P2:

P2:一旦一个 value 被批准(chosen),那么之后批准(chosen)的 value 必须和这个 value 一样。

注:通过某种方法可以为每个提案分配一个编号,在提案之间建立一个全序关系,后提出的提案编号大。

如果 P1 和 P2 都能够保证,那么约束2就能够保证。

批准一个value意味着多个acceptor通过(accept)了该value. 因此,可以对P2 进行加强:

P2a:一旦一个 value v 被批准(chosen),那么之后任何 acceptor 再通过(accept)的 value 必须是 v。

由于通信是异步的,P2a 和 P1 会发生冲突。如果一个 value 通过后,一个 proposer 和一个 acceptor 从休眠中苏醒,前者提出一个新的 value,根据 P1,后者应当批准;根据 P2a,则不应当批准。于是需要对 proposer 的行为进行约束:

P2b:一旦一个 value v 被批准(chosen),那么以后 proposer 提出的新提案必须具有 value v。

P2b 蕴涵了 P2a,是一个更强的约束。

但是根据 P2b 难以提出实现手段。因此需要进一步加强 P2b。

假设一个编号为 m 的 value v 已经获得批准(chosen),来看看在什么情况下对任何编号为 n(n>m)的提案都含有 value v。因为 m 已经获得批准(chosen),显然存在一个 acceptors 的多数派 C,他们都通过(accept)了 v。考虑到任何多数派都和 C 具有至少一个公共成员,可以找到一个蕴涵 P2b 的约束 P2c:

P2c:如果一个编号为 n 的提案具有 value v,那么存在一个多数派,要么他们中没有人通过(accept)过编号小于 n 
的任何提案,要么他们进行的最近一次批准(chosen)具有 value v。

可以用数学归纳法证明 P2c 蕴涵 P2b:假设具有 value v 的提案 m 获得通过,当 n=m+1 时,根据 P2c,由于任何一个多数派中至少有一个批准了 m,因此提案具有 value v;若 (m+1)..(n-1) 所有提案都具有 value v,根据 P2c,若反设新提案 n 不具有 value v 则存在一个多数派,他们没有批准过 m..(n-1) 中的任何提案。但是我们知道,他们中至少有一个人批准了 m。于是我们导出了矛盾,获得了证明。

P2c 是可以通过消息传递模型实现的。另外,引入了 P2c 后,解决了前文提到的 P1 不完备的问题。

算法的内容

要满足 P2c 的约束,proposer 提出一个提案前,首先要和足以形成多数派的 acceptors 进行通信,获得他们进行的最近一次批准活动的编号(prepare 过程),之后根据回收的信息决定这次提案的 value,形成提案开始投票。当获得多数 acceptors 批准后,提案获得通过,由 proposer 将这个消息告知 learner。这个简略的过程经过进一步细化后就形成了 Paxos 算法。

每个提案需要有不同的编号,且编号间要存在偏序关系。可以用多种方法实现这一点,例如将序数和 proposer 的名字拼接起来。如何做到这一点不在 Paxos 算法讨论的范围之内。

如果一个 acceptor 在 prepare 过程中回答了一个 proposer 针对”草案” n 的问题,但是在开始对 n 进行投票前,又批准另一个提案(例如 n-1),如果两个提案具有不同的 value,这个投票就会违背 P2c。因此在 prepare 过程中,acceptor 进行的回答同时也应包含承诺:不会再批准编号小于 n 的提案。这是对 P1 的加强:

P1a:当且仅当 acceptor 没有收到编号大于 n 的 prepare 请求时,acceptor 批准编号为 n 的提案。

现在已经可以提出完整的算法了。

决议的提出与通过

通过一个决议分为两个阶段:

  1. prepare 阶段:
    1. proposer 选择一个提案编号 n 并将 prepare 请求发送给 acceptors 中的一个多数派;
    2. acceptor 收到 prepare 消息后,如果提案的编号大于它已经回复的所有 prepare 消息,则 acceptor 将自己上次的批准回复给 proposer,并承诺不再批准小于 n 的提案;
  2. 批准阶段:
    1. 当一个 proposor 收到了多数 acceptors 对 prepare 的回复后,就进入批准阶段。它要向回复 prepare 请求的 acceptors 发送 accept 请求,包括编号 n 和根据 P2c 决定的 value(如果根据 P2c 没有决定 value,那么它可以自由决定 value)。
    2. 在不违背自己向其他 proposer 的承诺的前提下,acceptor 收到 accept 请求后即批准这个请求。

这个过程在任何时候中断都可以保证正确性。例如如果一个 proposer 发现已经有其他 proposers 提出了编号更高的提案,则有必要中断这个过程。因此为了优化,在上述 prepare 过程中,如果一个 acceptor 发现存在一个更高编号的”草案”,则需要通知 proposer,提醒其中断这次提案。

实例

用实际的例子来更清晰地描述上述过程:

有 A1, A2, A3, A4, A5 5位议员, 就税率问题进行决议. 议员 A1 决定将税率定为 10%, 因此它向所有人发出一个草案. 这个草案的内容是:

现有的税率是什么? 如果没有决定, 则建议将其定为 10%. 时间: 本届议会第3年3月15日; 提案者: A1

在最简单的情况下, 没有人与其竞争; 信息能及时顺利地传达到其它议员处.

于是, A2-A5 回应:

我已收到你的提案, 等待最终批准

而 A1 在收到2份回复后就发布最终决议:

税率已定为 10%, 新的提案不得再讨论本问题.

这实际上退化为二段提交协议.

现在我们假设在 A1 提出提案的同时, A5 决定将税率定为 20%:

现有的税率是什么? 如果没有决定, 则建议将其定为 20%. 时间: 本届议会第3年3月15日; 提案者: A5

草案要通过侍从送到其它议员的案头. A1 的草案将由5位侍从送到 A2-A5 那里. 现在, 负责 A2 和 A3 的侍从将草案顺利送达, 负责 A4 和 A5 的侍从则不上班. A5 的草案则顺利的送至 A3 和 A4 手中.

现在, A1, A2, A3 收到了 A1 的提案; A3, A4, A5 收到了 A5 的提案. 按照协议, A1, A2, A4, A5 将接受他们收到的提案, 侍从将拿着

我已收到你的提案, 等待最终批准

的回复回到提案者那里.

而 A3 的行为将决定批准哪一个.

情况一

假设 A1 的提案先送到 A3 处, 而 A5 的侍从决定放假一段时间. 于是 A3 接受并派出了侍从. A1 等到了两位侍从, 加上它自己已经构成一个多数派, 于是税率 10% 将成为决议. A1 派出侍从将决议送到所有议员处:

税率已定为 10%, 新的提案不得再讨论本问题.

A3 在很久以后收到了来自 A5 的提案. 由于税率问题已经讨论完毕, 他决定不再理会. 但是他要抱怨一句:

税率已在之前的投票中定为 10%, 你不要再来烦我!

这个回复对 A5 可能有帮助, 因为 A5 可能因为某种原因很久无法与与外界联系了. 当然更可能对 A5 没有任何作用, 因为 A5 可能已经从 A1 处获得了刚才的决议.

情况二

依然假设 A1 的提案先送到 A3 处, 但是这次 A5 的侍从不是放假了, 只是中途耽搁了一会. 这次, A3 依然会将”接受”回复给 A1. 但是在决议成型之前它又收到了 A5 的提案. 这时协议有两种处理方式:

1. 如果 A5 的提案更早, 按照传统应该由较早的提案者主持投票. 现在看来两份提案的时间一样(本届议会第3年3月15日). 但是 A5 是个惹不起的大人物. 于是 A3 回复:

我已收到您的提案, 等待最终批准, 但是您之前有人提出将税率定为 10%, 请明察.

于是, A1 和 A5 都收到了足够的回复. 这时关于税率问题就有两个提案在同时进行. 但是 A5 知道之前有人提出税率为 10%. 于是 A1 和 A5 都会向全体议员广播:

 税率已定为 10%, 新的提案不得再讨论本问题.

一致性得到了保证.

2. A5 是个无足轻重的小人物. 这时 A3 不再理会他, A1 不久后就会广播税率定为 10%.

情况三

在这个情况中, 我们将看见, 根据提案的时间及提案者的权势决定是否应答是有意义的. 在这里, 时间和提案者的权势就构成了给提案编号的依据. 这样的编号符合”任何两个提案之间构成偏序”的要求.

A1 和 A5 同样提出上述提案, 这时 A1 可以正常联系 A2 和 A3; A5 也可以正常联系这两个人. 这次 A2 先收到 A1 的提案; A3 则先收到 A5 的提案. A5 更有权势.

在这种情况下, 已经回答 A1 的 A2 发现有比 A1 更有权势的 A5 提出了税率 20% 的新提案, 于是回复 A5 说:

我已收到您的提案, 等待最终批准, 但是您之前有人提出将税率定为 10%, 请明察.

而回复了 A5 的 A3 发现新的提案者是个小人物, 不予理会.

于是 A5 将主持投票, 但决议的内容将是 A1 提出的税率 10%.

如果 A3 决定平等地对待每一位议员, 对 A1 做出”你之前有人提出将税率定为 20%” 的回复, 则将造成混乱. 这种情况下 A1 和 A5 都将试图主持投票, 但是这次两份提案的内容不同.

这种情况下, A3 若对 A1 进行回复, 只能说:

有更大的人物关注此事, 请等待他做出决定.

另外, 在这种情况下, A4 与外界失去了联系. 等到他恢复联系, 并需要得知税率情况时, 他(在最简单的协议中)将提出一个提案:

现有的税率是什么? 如果没有决定, 则建议将其定为 15%. 时间: 本届议会第3年4月1日; 提案者: A4

这时, (在最简单的协议中)其他议员将会回复:

税率已在之前的投票中定为 10%, 你不要再来烦我!

决议的发布

一个显而易见的方法是当 acceptors 批准一个 value 时,将这个消息发送给所有 learner。但是这个方法会导致消息量过大。

由于假设没有 Byzantine failures,learners 可以通过别的 learners 获取已经通过的决议。因此 acceptors 只需将批准的消息发送给指定的某一个 learner,其他 learners 向它询问已经通过的决议。这个方法降低了消息量,但是指定 learner 失效将引起系统失效。

因此 acceptors 需要将 accept 消息发送给 learners 的一个子集,然后由这些 learners 去通知所有 learners。

但是由于消息传递的不确定性,可能会没有任何 learner 获得了决议批准的消息。当 learners 需要了解决议通过情况时,可以让一个 proposer 重新进行一次提案。注意一个 learner 可能兼任 proposer。

Progress 的保证

根据上述过程当一个 proposer 发现存在编号更大的提案时将终止提案。这意味这提出一个编号更大的提案会终止之前的提案过程。如果两个 proposer 在这种情况下都转而提出一个编号更大的提案,就可能陷入活锁,违背了 Progress 的要求。这种情况下的解决方案是选举出一个 president,仅允许 president 提出提案。但是由于消息传递的不确定性,可能有多个 proposer 自认为自己已经成为 president。Lamport 在The Part-Time Parliament一文中描述并解决了这个问题。

发表在 分布式系统理论 | Paxos算法已关闭评论

PL0文法编译器C语言源代码

转自我的CSDN博客: http://blog.csdn.net/njdragonfly

对PL0文法进行了扩充,主要增加了数组及结构体的功能,并用C语言实现了之。

#include "stdio.h"
#include "string.h"
#include "stdlib.h"
#include "ctype.h"
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
typedef int BOOL;
#define cxmax 2000
#define amax 16383
#define imax 100 /* length of identifier table */
#define tmax 100 /* length of type table */
#define lmax 10  /* maximum level */
#define al 10  /* length of identifiers */
#define norw 27  /* number of reserverd words */
/* standard function */
#define fabs 0
#define fsqr 1
#define fodd 2
#define fchr 3
#define ford 4
#define fwrite 5
#define fwriteln 6
#define fread 7
#define freadln 8
#define feoln 9
/* standard types */
#define intip 1
#define booltip 2
#define chartip 3
/*指令码*/
typedef enum opcode{
	add, neg, mul, divd, remd, div2, rem2, eqli, neqi, lssi,
	leqi, gtri, geqi, dupl, swap, andb, orb,
	load, stor, hhalt, wri, wrc, wrl, rdi, rdc, rdl, eol,
	ldc, ldla, ldl, ldg, stl, stg, move, copy, addc, mulc,
	jump, jumpz, call, adjs, sets, pexit
}opcode;
/*指令结构体*/
typedef struct instr{
	opcode op;
	int a;
}instr;
/*词法类别*/
typedef enum symbol{
	ident, number, sstring, plus, minus, star, lbrack, rbrack,
	colon, eql, neq, lss, leq, gtr, geq, lparen, rparen, comma, 
	semicolon, period, becomes,
	beginsym, endsym, ifsym, thensym, elsesym, whilesym, dosym,
	casesym, repeatsym, untilsym, forsym, tosym, downtosym,
	notsym, divsym, modsym, andsym, orsym, constsym, varsym,
	typesym, arraysym, ofsym, recordsym, progsym, funcsym,
	procsym
}symbol;
/*变量类型*/
typedef enum idkind{
	konst, varbl, field, tipe, funkt
}idkind;
/*类型的种类,简单的,数组,记录类型*/
typedef enum tpkind{
	simple, arrays, records
}tpkind;
typedef char alfa[al+1];
instr code[cxmax + 1];
int m[amax + 1];
/*词法分析相关全局变量*/
char ch;
int cc = 0, ll = 0;
char line[129];
symbol sym;
alfa id;
int num;
char str[81];
int slen;
/*alfa word[norw + 1];*/
int cx;
int lev;
int dx;
BOOL labeled;
int nl; /* as namelist[-1] */
int namelist[lmax];
int ix, tx; /* indices in tables */
/* identifier table */
typedef struct ITAB{
	alfa name;
	int link;
	int tip;
	idkind kind;
	union{
		int val; /*常量类型的值*/
		struct{
			int vlevel;
			int vadr;
			BOOL refpar;
		};   /*变量类型的属性*/
		int offset; /*域类型的偏移地址*/
		struct{
			int flevel;
			int fadr;
			int lastpar;
			int resultadr;
			BOOL inside;
		};   /*函数类型的属性*/
	};
}ITAB;
ITAB itab[imax + 1];
/* type table */
typedef struct TTAB{
	int size;
	tpkind kind;
	union{
		struct{
			int low;
			int high;
			int elemtip;
		}; /*数组类型的属性*/
		int fields; /*记录类型最后一个域的地址*/
	};
}TTAB;
TTAB ttab[tmax + 1];
/*保留字*/
static struct{
	alfa name;
	symbol lex;
}word[] = {
	{ "", -1 },
	{ "begin", beginsym },
	{ "end", endsym },
	{ "if", ifsym },
	{ "then", thensym },
	{ "else", elsesym },
	{ "while", whilesym },
	{ "do", dosym },
	{ "case", casesym },
	{ "repeat", repeatsym },
	{ "until", untilsym },
	{ "for", forsym },
	{ "to", tosym },
	{ "downto", downtosym },
	{ "not", notsym },
	{ "div", divsym },
	{ "mod", modsym },
	{ "and", andsym },
	{ "or", orsym },
	{ "const", constsym },
	{ "var", varsym },
	{ "type", typesym },
	{ "array", arraysym },
	{ "of", ofsym },
	{ "record", recordsym },
	{ "program", progsym },
	{ "function", funcsym },
	{ "procedure", procsym }
};
FILE * source;
BOOL eof_flag = FALSE;
symbol search()
{
	int i;
	for(i = norw; i >= 1; i--)
	{
		if(strcmp(id, word[i].name) == 0)
			return word[i].lex;
	}
	return ident;
}
void error(int n)
{
	int i;
	for(i = 0; i < ll; i++)
		putchar(line[i]);
	for(i = 0; i <= cc - 1; i++)
		putchar(' ');
	printf("^/n");
	printf("error %d detected/n", n);
	exit(1);
}
void getch()
{
	if (cc == ll)
	{
		memset(line, 0, 129);
		if(feof(source))
		{
			fprintf(stderr, "program incomplete/n");
			exit(0);
		}
		ll = 0;
		cc = 0;
		while(!feof(source) && (ch = getc(source)) != '/n')
		{
			line[ll] = ch;
			ll++;
		}
		if(ch == '/n')
		{
			line[ll] = ch;
			ll++;
		}
	}
	ch = line[cc];
	cc++;
}
void getsym()
{
	int k;
	int strend;
	while(ch == ' ' || ch == '/t' || ch == '/n')
		getch();
	if(isalpha(ch))
	{
		memset(id, 0, al+1);
		k = 0;
		do{
			if(k != al)
			{
				id[k] = ch;
				k++;
			}
			getch();
		}while(isalnum(ch));
		sym = search();
	}
	else if(isdigit(ch))
	{
		num = 0;
		sym = number;
		do{
			num = 10 * num + (ch - '0');
			getch();
		}while(isdigit(ch));
	}
	else if(ch == ':')
	{
		getch();
		if(ch == '=')
		{
			getch();
			sym = becomes;
		}
		else
			sym = colon;
	}
	else if(ch == '>')
	{
		getch();
		if(ch == '=')
		{
			getch();
			sym = geq;
		}
		else
			sym = gtr;
	}
	else if(ch == '<')
	{
		getch();
		if(ch == '=')
		{
			getch();
			sym = leq;
		}
		else if(ch == '>')
		{
			getch();
			sym = neq;
		}
		else
			sym = lss;
	}
	else if(ch == '.')
	{
		getch();
		if(ch == '.')
		{
			getch();
			sym = colon;
		}
		else
			sym = period;
	}
	else if(ch == '/'')
	{
		slen = 0;
		strend = FALSE;
		sym = sstring;
		do{
			if(cc == ll)
				error(101);
			getch();
			if(ch == '/'')
			{
				getch();
				if(ch == '/'')
				{
					str[slen] = ch;
					slen++;
				}
				else
					strend = TRUE;
			}
			else
			{
				str[slen] = ch;
				slen++;
			}
		}while(strend == FALSE);
		if(slen == 0)
			error(102); /*不允许空字符串*/
		str[slen++] = '/0';
	}
	else if(ch == '+')
	{
		getch();
		sym = plus;
	}
	else if(ch == '-')
	{
		getch();
		sym = minus;
	}
	else if(ch == '*')
	{
		getch();
		sym = star;
	}
	else if(ch == '(')
	{
		getch();
		sym = lparen;
	}
	else if(ch == ')')
	{
		getch();
		sym = rparen;
	}
	else if(ch == '[')
	{
		getch();
		sym = lbrack;
	}
	else if(ch == ']')
	{
		getch();
		sym = rbrack;
	}
	else if(ch == '=')
	{
		getch();
		sym = eql;
	}
	else if(ch == ',')
	{
		getch();
		sym = comma;
	}
	else if(ch == ';')
	{
		getch();
		sym = semicolon;
	}
	else if(ch == '{')
	{
		do{
			getch();
		}while(ch != '}');
		getch();
		getsym();
	}
	else
		error(104);
}
void check(symbol s)
{
	if(sym != s)
		error(s);
}
void skip(symbol s)
{
	check(s);
	getsym();
}
/*将符号串登记入符号表*/
void enter(alfa id, idkind k, int t)
{
	int j;
	if(ix == imax)
		error(104);
	else
	{
		ix++;
		strcpy(itab[0].name, id);
		if(lev == -1)
			j = nl;
		else
			j = namelist[lev];
		while(strcmp(itab[j].name, id) != 0)
			j = itab[j].link;
		if(j != 0)
			error(105);
		else
		{
			strcpy(itab[ix].name, id);
			if(lev == -1)
				itab[ix].link = nl;
			else
				itab[ix].link = namelist[lev];
			itab[ix].tip = t;
			itab[ix].kind = k;
			if(lev == -1)
				nl = ix;
			else
				namelist[lev] = ix;
		}
	}
}
/*在符号表中查找符号,返回位置*/
int position()
{
	int i , j;
	strcpy(itab[0].name, id);
	i = lev;
	do{
		if(i == -1)
			j = nl;
		else
			j = namelist[i];
		while(strcmp(itab[j].name, id) != 0)
			j = itab[j].link;
		i = i - 1;
	}while(i >= -1 && j == 0);
	if(j == 0)
		error(106);
	return j;
}
void gen(instr i)
{
	switch(i.op)
	{
	case dupl:
	case eol:
	case ldc:
	case ldla:
	case ldl:
	case ldg:
		dx = dx - 1;
		break;
	case add:
	case mul:
	case divd:
	case remd:
	case eqli:
	case neqi:
	case lssi:
	case leqi:
	case gtri:
	case geqi:
	case andb:
	case orb:
	case wrc:
	case rdi:
	case rdc:
	case stl:
	case stg:
	case jumpz:
		dx = dx + 1;
		break;
	case stor:
	case wri:
	case move:
		dx = dx + 2;
		break;
	case copy:
		dx = dx - i.a + 1;
		break;
	case adjs:
		dx = dx + i.a;
		break;
	}
	if(!(((i.op == addc || i.op == adjs) && (i.a == 0)) || ((i.op == mulc) && (i.a == 1))))
	{
		if(labeled)
		{
			code[cx] = i;
			cx = cx +1;
			labeled = FALSE;
		}
		else if(code[cx - 1].op == ldc && i.op == add)
		{
			code[cx - 1].op = addc;
		}
		else if(code[cx - 1].op == ldc && i.op == mul)
		{
			code[cx - 1].op = mulc;
		}
		else if(code[cx - 1].op == ldc &&  i.op == neg)
		{
			code[cx - 1].a = -code[cx - 1].a;
		}
		else if(code[cx - 1].op == ldc && code[cx - 1].a == 2 && i.op == divd)
		{
			code[cx - 1].op = div2;
		}
		else if(code[cx - 1].op == ldc && code[cx - 1].a == 2 && i.op == remd)
		{
			code[cx - 1].op = rem2;
		}
		else if(code[cx - 1].op == ldc && i.op == stor)
		{
			code[cx - 1].op = stg;
		}
		else if(code[cx - 1].op == ldc && i.op == load)
		{
			code[cx - 1].op = ldg;
		}
		else if(code[cx - 1].op == ldla && i.op == stor)
		{
			code[cx - 1].op = stl;
		}
		else if(code[cx - 1].op == ldla && i.op == load)
		{
			code[cx - 1].op = ldl;
		}
		else
		{
			code[cx] = i;
			cx = cx + 1;
		}
	}
}
void gen0(opcode op)
{
	instr i;
	i.op = op;
	gen(i);
}
void gen1(opcode op, int a)
{
	instr i;
	i.op = op;
	i.a = a;
	gen(i);
}
int codelabel()
{
	labeled = TRUE;
	return cx;
}
void address(int lv, int ad)
{
	if(lv == 0)
		gen1(ldc, ad);
	else if(lv == lev)
		gen1(ldla, ad - dx);
	else
	{
		gen1(ldl, -dx);
		while(lv + 1 != lev)
		{
			gen0(load);
			lv = lv + 1;
		}
		gen1(addc, ad);
	}
}
void addressvar(int ref)
{
	address(itab[ref].vlevel, itab[ref].vadr);
	if(itab[ref].refpar)
		gen0(load);
}
void mustbe(int x, int y)
{
	if(x != y)
	{
		if((ttab[x].kind == arrays) && (ttab[y].kind == arrays) && 
			(ttab[x].low == ttab[y].low) && (ttab[x].high == ttab[y].high))
			mustbe(ttab[x].elemtip, ttab[y].elemtip);
		else
			error(107);/*类型不匹配*/
	}
}
void expression(int * x);
void selector(int * t, int * ref)
{
	int j, x;
	*t = itab[*ref].tip;
	getsym();
	if(sym == period ||sym == lbrack)
	{
		addressvar(*ref);
		*ref = 0;
		while(sym == period || sym == lbrack)
		{
			switch(sym)
			{
			case period:
				if(ttab[*t].kind != records)
					error(108);
				else
				{
					getsym();
					check(ident);
					j = ttab[*t].fields;
					strcpy(itab[0].name, id);
					while(strcmp(itab[0].name, id) != 0)
						j = itab[j].link;
					if(j == 0)
						error(109);
					else
					{
						gen1(addc, itab[j].offset);
						*t = itab[j].tip;
						getsym();
					}
				}
				break;
			case lbrack:
				do{
					if(ttab[*t].kind != arrays)
						error(110);
					else
					{
						getsym();
						expression(&x);
						mustbe(intip, x);
						gen1(addc, -(ttab[*t].low));
						*t = ttab[*t].elemtip;
						gen1(mulc, ttab[*t].size);
						gen0(add);
					}
				}while(sym == comma);
				skip(rbrack);
				break;
			}
		}
	}
}
void varpar(int * t)
{
	int j;
	check(ident);
	j = position();
	selector(t, &j);
	if(j != 0)
		addressvar(j);
}
/*标准函数*/
void standfct(int n)
{
	int x, l;
	switch(n)
	{
	case fabs:
		skip(lparen);
		expression(&x);
		mustbe(intip, x);
		gen0(dupl);
		gen1(ldc, 0);
		gen0(lssi);
		l = codelabel();
		gen1(jumpz, 0);
		gen0(neg);
		code[l].a = codelabel();
		skip(rparen);
		break;
	case fsqr:
		skip(lparen);
		expression(&x);
		mustbe(intip, x);
		gen0(dupl);
		gen0(mul);
		skip(rparen);
		break;
	case fodd:
		skip(lparen);
		expression(&x);
		mustbe(intip, x);
		gen0(rem2);
		skip(rparen);
		break;
	case fchr:
		skip(lparen);
		expression(&x);
		mustbe(intip, x);
		skip(rparen);
		break;
	case ford:
		skip(lparen);
		expression(&x);
		mustbe(chartip, x);
		skip(rparen);
		break;
	case fwrite:
	case fwriteln:
		if(n == fwrite)
			check(lparen);
		if(sym == lparen)
		{
			do{
				getsym();
				if(sym == sstring)
				{
					for(x = 0; x < slen; x++)
					{
						gen1(ldc, str[x]);
						gen0(wrc);
					}
					getsym();
				}
				else
				{
					expression(&x);
					if(sym == colon)
					{
						mustbe(intip, x);
						getsym();
						expression(&x);
						mustbe(intip, x);
						gen0(wri);
					}
					else if(x == intip)
					{
						gen1(ldc, 8);
						gen0(wri);
					}
					else if(x == chartip)
					{
						gen0(wrc);
					}
					else
						error(111);
				}
			}while(sym == comma);
			skip(rparen);
		}
		if(n == fwriteln)
			gen0(wrl);
		break;
	case fread:
	case freadln:
		if(n == fread)
			check(lparen);
		if(sym == lparen)
		{
			do{
				getsym();
				varpar(&x);
				if(x == intip)
					gen0(rdi);
				else if(x == chartip)
					gen0(rdc);
				else
					error(112);
			}while(sym == comma);
			skip(rparen);
		}
		if(n == freadln)
			gen0(rdl);
		break;
	case feoln:
		gen0(eol);
		break;
	}
}
/*函数,过程调用*/
void funcall(int i)
{
	int d, p, x;
	getsym();
	if(itab[i].flevel < 0)
		standfct(itab[i].fadr);
	else
	{
		if(itab[i].tip != 0)
			gen1(ldc, 0);
		p = i;
		d = dx;
		if(sym == lparen)
		{
			do{
				getsym();
				if(p == itab[i].lastpar)
					error(113);
				else
				{
					p = p + 1;
					if(itab[p].refpar == TRUE)
						varpar(&x);
					else
					{
						expression(&x);
						if(ttab[x].kind != simple)
							gen1(copy, ttab[x].size);
					}
				}
				mustbe(itab[p].tip, x);
			}while(sym == comma);
			skip(rparen);
		}
		if(p != itab[i].lastpar)
			error(114);
		if(itab[i].flevel != 0)
			address(itab[i].flevel, 0);
		gen1(call, itab[i].fadr);
		dx = d;
	}
}
/*因子*/
void factor(int * t)
{
	int i;
	if(sym == ident)
	{
		i = position();
		*t = itab[i].tip;
		switch(itab[i].kind)
		{
		case konst:
			getsym();
			gen1(ldc, itab[i].val);
			break;
		case varbl:
			selector(t, &i);
			if(i != 0)
				addressvar(i);
			if(ttab[i].kind == simple)
				gen0(load);
			break;
		case funkt:
			if(*t == 0)
				error(115);
			else
				funcall(i);
			break;
		case tipe:
			error(116); /*类型名不能作为因子*/
			break;
		}
	}
	else if(sym == number)
	{
		gen1(ldc, num);
		*t = intip;
		getsym();
	}
	else if(sym == sstring && slen == 2)
	{
		gen1(ldc, str[0]);
		*t = chartip;
		getsym();
	}
	else if(sym == lparen)
	{
		getsym();
		expression(t);
		skip(rparen);
	}
	else if(sym == notsym)
	{
		getsym();
		factor(t);
		mustbe(booltip, *t);
		gen0(neg);
		gen1(addc, 1);
	}
	else
		error(117);
}
/*表达式的项*/
void term(int * x)
{
	int y;
	factor(x);
	while(sym == andsym || sym == star || sym == divsym || sym == modsym)
	{
		if(sym == andsym)
			mustbe(booltip, *x);
		else
			mustbe(intip, *x);
		switch(sym)
		{
		case star:
			getsym();
			factor(&y);
			gen0(mul);
			break;
		case divsym:
			getsym();
			factor(&y);
			gen0(divd);
			break;
		case modsym:
			getsym();
			factor(&y);
			gen0(remd);
			break;
		case andsym:
			getsym();
			factor(&y);
			gen0(andb);
			break;
		}
		mustbe(*x, y);
	}
}
/*简单表达式*/
void simpleexpression(int * x)
{
	int y;
	if(sym == plus)
	{
		getsym();
		term(x);
		mustbe(intip, *x);
	}
	else if(sym == minus)
	{
		getsym();
		term(x);
		mustbe(intip, *x);
		gen0(neg);
	}
	else
		term(x);
	while(sym == orsym || sym == plus || sym == minus)
	{
		if(sym == orsym)
			mustbe(booltip, *x);
		else
			mustbe(intip, *x);
		switch(sym)
		{
		case plus:
			getsym();
			term(&y);
			gen0(add);
			break;
		case minus:
			getsym();
			term(&y);
			gen0(neg);
			gen0(add);
			break;
		case orsym:
			getsym();
			term(&y);
			gen0(orb);
			break;
		}
		mustbe(*x, y);
	}
}
/*表达式*/
void expression(int * x)
{
	symbol op;
	int y;
	simpleexpression(x);
	if(sym == eql ||sym == neq || sym == lss || sym == leq || sym == gtr ||sym == geq)
	{
		if(ttab[*x].kind != simple)
			error(118);
		else
		{
			op = sym;
			getsym();
			simpleexpression(&y);
			mustbe(*x, y);
			switch(op)
			{
			case eql:
				gen0(eqli);
				break;
			case neq:
				gen0(neqi);
				break;
			case lss:
				gen0(lssi);
				break;
			case leq:
				gen0(leqi);
				break;
			case gtr:
				gen0(gtri);
				break;
			case geq:
				gen0(geqi);
				break;
			}
			*x = booltip;
		}
	}
}
/*语句*/
void statement()
{
	int i, j, t, x;
	if(sym == ident)
	{
		i = position();
		switch(itab[i].kind)
		{
		case varbl:
			selector(&t, &i);
			skip(becomes);
			expression(&x);
			mustbe(t, x);
			if(i == 0)
				gen0(swap);
			else
				addressvar(i);
			if(ttab[i].kind == simple)
				gen0(stor);
			else
				gen1(move, ttab[i].size);
			break;
		case funkt:
			if(itab[i].tip == 0)
				funcall(i);
			else
			{
				if(itab[i].inside == FALSE)
					error(119);/*此处不能对函数赋值*/
				else
				{
					getsym();
					skip(becomes);
					expression(&x);
					mustbe(itab[i].tip, x);
					address(itab[i].flevel + 1, itab[i].resultadr);
					gen0(stor);
				}
			}
			break;
		case konst:
		case field:
		case tipe:
			error(120); /*变量不能用在此处*/
			break;
		}
	}
	else if(sym == ifsym)
	{
		getsym();
		expression(&t);
		mustbe(booltip, t);
		skip(thensym);
		i = codelabel();
		gen1(jumpz, 0);
		statement();
		if(sym == elsesym)
		{
			getsym();
			j = codelabel();
			gen1(jump, 0);
			code[i].a = codelabel();
			i = j;
			statement();
		}
		code[i].a = codelabel();
	}
	else if(sym == whilesym)
	{
		getsym();
		i = codelabel();
		expression(&t);
		mustbe(booltip, t);
		skip(dosym);
		j = codelabel();
		gen1(jumpz, 0);
		statement();
		gen1(jump, i);
		/*这里表写错了*/
		code[j].a = codelabel();
	}
	else if(sym == repeatsym)
	{
		i = codelabel();
		do{
			getsym();
			statement();
		}while(sym == semicolon);
		skip(untilsym);
		expression(&t);
		mustbe(booltip, t);
		gen1(jumpz, i);
	}
	else if(sym == beginsym)
	{
		do{
			getsym();
			statement();
		}while(sym == semicolon);
		skip(endsym);
	}
}
void block(int l);
/*常量*/
void constant(int * c, int * t)
{
	int i, s;
	if(sym == sstring && slen == 2)
	{
		*c = str[0];
		*t = chartip;
	}
	else
	{
		if(sym == plus)
		{
			getsym();
			s = +1;
		}
		else if(sym == minus)
		{
			getsym();
			s = -1;
		}
		else
			s = 0;
		if(sym == ident)
		{
			i = position();
			if(itab[i].kind != konst)
				error(121);
			else
			{
				*c = itab[i].val;
				*t = itab[i].tip;
			}
		}
		else if(sym == number)
		{
			*c = num;
			*t = intip;
		}
		else
			error(122);
		if(s != 0)
		{
			mustbe(*t, intip);
			(*c) = (*c) * (s);
		}
	}
	getsym();
}
/*常量声明*/
void constdeclaration()
{
	alfa a;
	int t, c;
	strcpy(a, id);
	getsym();
	skip(eql);
	constant(&c, &t);
	skip(semicolon);
	enter(a, konst, t);
	itab[ix].val = c;
}
void typ(int * t);
/*数组类型*/
void arraytyp(int * t)
{
	int x;
	ttab[*t].kind = arrays;
	getsym();
	constant(&(ttab[*t].low), &x);
	mustbe(intip, x);
	skip(colon);
	constant(&(ttab[*t].high), &x);
	mustbe(intip, x);
	if(ttab[*t].low > ttab[*t].high)
		error(123); /*数组边界问题*/
	if(sym == comma)
		arraytyp(&(ttab[*t].elemtip));
	else
	{
		skip(rbrack);
		skip(ofsym);
		typ(&(ttab[*t].elemtip));
	}
	ttab[*t].size = (ttab[*t].high - ttab[*t].low + 1) * ttab[ttab[*t].elemtip].size;
}
/*类型定义*/
void typ(int * t)
{
	int i, j, sz, ft;
	if(sym == ident)
	{
		i = position();
		if(itab[i].kind != tipe)
			error(124); /*这个标识符不是类型能够*/
		else
		{
			*t = itab[i].tip;
			getsym();
		}
	}
	else
	{
		if(tx == tmax)
		{
			error(125); /*溢出,应退出*/
		}
		else
		{
			tx = tx + 1;
			*t = tx;
		}
		if(sym == arraysym)
		{
			getsym();
			check(lbrack);
			arraytyp(t);
		}
		else
		{
			skip(recordsym);
			if(lev == lmax)
			{
				error(126); /*深度超过限度,应退出*/
			}
			else
			{
				lev = lev + 1;
				if(lev == -1)
					nl = 0;
				else
					namelist[lev] = 0;
				check(ident);
				sz = 0;
				do{
					enter(id, field, 0);
					i = ix;
					getsym();
					while(sym == comma)
					{
						getsym();
						check(ident);
						enter(id, field, 0);
						getsym();
					}
					j = ix;
					skip(colon);
					typ(&ft);
					do{
						itab[i].tip = ft;
						itab[i].offset = sz;
						sz = sz + ttab[ft].size;
						i = i + 1;
					}while(i <= j);
					if(sym == semicolon)
						getsym();
					else
						check(endsym);
				}while(sym == ident);
				ttab[*t].size = sz;
				ttab[*t].kind = records;
				if(lev == -1)
					ttab[*t].fields = nl;
				else
					ttab[*t].fields = namelist[lev];
				lev = lev - 1;
				skip(endsym);
			}
		}
	}
}
/*类型声明,type保留字处*/
void typedeclaration()
{
	alfa a;
	int t;
	strcpy(a, id);
	getsym();
	skip(eql);
	typ(&t);
	skip(semicolon);
	enter(a, tipe, t);
}
/*变量声明*/
void vardeclaration()
{
	int p, q, t;
	enter(id, varbl, 0);
	p = ix;
	getsym();
	while(sym == comma)
	{
		getsym();
		check(ident);
		enter(id, varbl, 0);
		getsym();
	}
	q = ix;
	skip(colon);
	typ(&t);
	skip(semicolon);
	do{
		itab[p].vlevel = lev;
		dx = dx - ttab[t].size;
		itab[p].tip = t;
		itab[p].vadr = dx;
		itab[p].refpar = FALSE;
		p = p + 1;
	}while(p <= q); 
}
/*参数列表*/
void paramlist(int *p, int * ps)
{
	BOOL r;
	int t;
	if(sym == varsym)
	{
		r = TRUE;
		getsym();
	}
	else
		r = FALSE;
	check(ident);
	*p = ix;
	enter(id, varbl, 0);
	getsym();
	while(sym == comma)
	{
		getsym();
		check(ident);
		enter(id, varbl, 0);
		getsym();
	}
	skip(colon);
	check(ident);
	typ(&t);
	while(*p < ix)
	{
		*p = *p + 1;
		itab[*p].tip = t;
		itab[*p].refpar = r;
		if(r)
			*ps = *ps + 1; /*传地址*/
		else
			*ps = *ps + ttab[t].size; /*传值*/
	}
}
void funcdeclaration(BOOL isf)
{
	int f, p, ps, odx;
	getsym();
	check(ident);
	enter(id, funkt, 0);
	getsym();
	f = ix;
	itab[f].flevel = lev;
	itab[f].fadr = codelabel();
	gen1(jump, 0);
	if(lev == lmax)
	{
		error(127); /*深度超过限度,应退出*/
	}
	lev = lev + 1;
	if(lev == -1)
		nl = 0;
	else
		namelist[lev] = 0;
	ps = 1;
	odx = dx;
	if(sym == lparen)
	{
		do{
			getsym();
			paramlist(&p, &ps);
		}while(sym == semicolon);
		skip(rparen);
	}
	if(lev > 1)
		dx = -1;
	else
		dx = 0;
	itab[f].resultadr = ps;
	p = f;
	while(p < ix)
	{
		p = p + 1;
		if(itab[p].refpar)
			ps = ps - 1;
		else
			ps = ps - ttab[itab[p].tip].size;
		itab[p].vlevel = lev;
		itab[p].vadr = ps;
	}
	if(isf == TRUE)
	{
		skip(colon);
		check(ident);
		typ(&(itab[f].tip));
		if(ttab[itab[f].tip].kind != simple)
			error(128); /*只能返回简单类型*/
	}
	skip(semicolon);
	itab[f].lastpar = ix;
	itab[f].inside = TRUE;
	block(itab[f].fadr);
	itab[f].inside = FALSE;
	gen1(pexit, itab[f].resultadr - dx);
	lev = lev - 1;
	dx = odx;
	skip(semicolon);
}
void block(int l)
{
	int d, odx, oix;
	odx = dx;
	oix = ix;
	if(sym == constsym)
	{
		getsym();
		check(ident);
		do{
			constdeclaration();
		}while(sym == ident);
	}
	if(sym == typesym)
	{
		getsym();
		check(ident);
		do{
			typedeclaration();
		}while(sym == ident);
	}
	if(sym == varsym)
	{
		getsym();
		check(ident);
		do{
			vardeclaration();
		}while(sym == ident);
	}
	while(sym == funcsym || sym == procsym)
	{
		if(sym == funcsym)
			funcdeclaration(TRUE);
		else
			funcdeclaration(FALSE);
	}
	if(l + 1 == codelabel())
		cx = cx -1;
	else
		code[l].a = codelabel();
	if(lev == 0)
		gen1(sets, dx);
	else
	{
		d = dx - odx;
		dx = odx;
		gen1(adjs, d);
	}
	statement();
	if(lev != 0)
		gen1(adjs, odx - dx);
	ix = oix;
}
void listcode(FILE * fi)
{
	int i;
	for(i = 0; i < cx; i++)
	{
		fprintf(fi, "%-4d :   ", i);
		switch(code[i].op)
		{
		case add:
			fprintf(fi, "add/n");
			break;
		case neg:
			fprintf(fi, "neg/n");
			break;
		case mul:
			fprintf(fi, "mul/n");
			break;
		case divd:
			fprintf(fi, "divd/n");
			break;
		case remd:
			fprintf(fi, "remd/n");
			break;
		case div2:
			fprintf(fi, "div2/n");
			break;
		case rem2:
			fprintf(fi, "rem2/n");
			break;
		case eqli:
			fprintf(fi, "eqli/n");
			break;
		case neqi:
			fprintf(fi, "neqi/n");
			break;
		case lssi:
			fprintf(fi, "lssi/n");
			break;
		case leqi:
			fprintf(fi, "leqi/n");
			break;
		case gtri:
			fprintf(fi, "gtri/n");
			break;
		case geqi:
			fprintf(fi, "geqi/n");
			break;
		case dupl:
			fprintf(fi, "dupl/n");
			break;
		case swap:
			fprintf(fi, "swap/n");
			break;
		case andb:
			fprintf(fi, "andb/n");
			break;
		case orb:
			fprintf(fi, "orb/n");
			break;
		case load:
			fprintf(fi, "load/n");
			break;
		case stor:
			fprintf(fi, "stor/n");
			break;
		case hhalt:
			fprintf(fi, "hhalt/n");
			break;
		case wri:
			fprintf(fi, "wri/n");
			break;
		case wrc:
			fprintf(fi, "wrc/n");
			break;
		case wrl:
			fprintf(fi, "wrl/n");
			break;
		case rdi:
			fprintf(fi, "rdi/n");
			break;
		case rdc:
			fprintf(fi, "rdc/n");
			break;
		case rdl:
			fprintf(fi, "rdl/n");
			break;
		case eol:
			fprintf(fi, "eol/n");
			break;
		case ldc:
			fprintf(fi, "ldc   %d/n", code[i].a);
			break;
		case ldla:
			fprintf(fi, "ldla  %d/n", code[i].a);
			break;
		case ldl:
			fprintf(fi,"ldl   %d/n", code[i].a);
			break;
		case ldg:
			fprintf(fi, "ldg   %d/n", code[i].a);
			break;
		case stl:
			fprintf(fi, "stl   %d/n", code[i].a);
			break;
		case stg:
			fprintf(fi, "stg   %d/n", code[i].a);
			break;
		case move:
			fprintf(fi, "move  %d/n", code[i].a);
			break;
		case copy:
			fprintf(fi, "copy  %d/n", code[i].a);
			break;
		case addc:
			fprintf(fi, "addc  %d/n", code[i].a);
			break;
		case mulc:
			fprintf(fi, "mulc  %d/n", code[i].a);
			break;
		case jump:
			fprintf(fi, "jump  %d/n", code[i].a);
			break;
		case jumpz:
			fprintf(fi, "jumpz %d/n", code[i].a);
			break;
		case call:
			fprintf(fi, "call  %d/n", code[i].a);
			break;
		case adjs:
			fprintf(fi, "adjs  %d/n", code[i].a);
			break;
		case sets:
			fprintf(fi, "sets  %d/n", code[i].a);
			break;
		case pexit:
			fprintf(fi, "exit  %d/n", code[i].a);
			break;
		}
	}
}
void compile()
{
	ttab[intip].size = 1;
	ttab[intip].kind = simple;
	ttab[chartip].size = 1;
	ttab[chartip].kind = simple;
	ttab[booltip].size = 1;
	ttab[booltip].kind = simple;
	tx = 3;
	nl = 0; /* namelist[-1] = 0; */
	lev = -1;
	ix = 0;
	enter("false", konst, booltip);
	itab[ix].val = FALSE;
	enter("true", konst, booltip);
	itab[ix].val = TRUE;
	enter("maxint", konst, intip);
	itab[ix].val = 32767;
	enter("integer", tipe, intip);
	enter("char", tipe, chartip);
	enter("boolean", tipe, booltip);
	enter("abs", funkt, intip);
	itab[ix].flevel = -1;
	itab[ix].fadr = fabs;
	itab[ix].inside = FALSE;
	enter("sqr", funkt, intip);
	itab[ix].flevel = -1;
	itab[ix].fadr = fsqr;
	itab[ix].inside = FALSE;
	enter("odd", funkt, booltip);
	itab[ix].flevel = -1;
	itab[ix].fadr = fodd;
	itab[ix].inside = FALSE;
	enter("chr", funkt, chartip);
	itab[ix].flevel = -1;
	itab[ix].fadr = fchr;
	itab[ix].inside = FALSE;
	enter("ord", funkt, intip);
	itab[ix].flevel = -1;
	itab[ix].fadr = ford;
	itab[ix].inside = FALSE;
	enter("write", funkt, 0);
	itab[ix].flevel = -1;
	itab[ix].fadr = fwrite;
	enter("writeln", funkt, 0);
	itab[ix].flevel = -1;
	itab[ix].fadr = fwriteln;
	enter("read", funkt, 0);
	itab[ix].flevel = -1;
	itab[ix].fadr = fread;
	enter("readln", funkt, 0);
	itab[ix].flevel = -1;
	itab[ix].fadr = freadln;
	enter("eoln", funkt, booltip);
	itab[ix].flevel = -1;
	itab[ix].fadr = feoln;
	itab[ix].inside = FALSE;
	namelist[0] = 0;
	lev = 0;
	cc = 0;
	ll = 0;
	getch();
	getsym();
	labeled = FALSE;
	cx = 0;
	dx = amax + 1;
	skip(progsym);
	skip(ident);
	check(lparen);
	do{
		getsym();
		check(ident);
		if(strcmp(id, "input") != 0 && strcmp(id, "output") != 0)
			error(129);
		getsym();
	}while(sym == comma);
	skip(rparen);
	skip(semicolon);
	gen1(jump, 0);
	block(0);
	gen0(hhalt);
	check(period);
}
/*解释执行*/
void interpret()
{
	int pc, sp, j, k, n;
	instr i;
	char c;
	BOOL h;
	pc = 0;
	h = FALSE;
	do{
		i = code[pc];
		pc = pc + 1;
		switch(i.op)
		{
		case add:
			m[sp + 1] = m[sp + 1] + m[sp];
			sp = sp + 1;
			break;
		case neg:
			m[sp] = -m[sp];
			break;
		case mul:
			m[sp + 1] = m[sp + 1] * m[sp];
			sp = sp + 1;
			break;
		case divd:
			m[sp + 1] = m[sp + 1] / m[sp];
			sp = sp + 1;
			break;
		case remd:
			m[sp + 1] = m[sp + 1] % m[sp];
			sp = sp + 1;
			break;
		case div2:
			m[sp] = m[sp] / 2;
			break;
		case rem2:
			m[sp] = m[sp] % 2;
			break;
		case eqli:
			m[sp + 1] = (m[sp + 1] == m[sp]);
			sp = sp + 1;
			break;
		case neqi:
			m[sp + 1] = (m[sp + 1] != m[sp]);
			sp = sp + 1;
			break;
		case lssi:
			m[sp + 1] = (m[sp + 1] < m[sp]);
			sp = sp + 1;
			break;
		case leqi:
			m[sp + 1] = (m[sp + 1] <= m[sp]);
			sp = sp + 1;
			break;
		case gtri:
			m[sp + 1] = (m[sp + 1] > m[sp]);
			sp = sp + 1;
			break;
		case geqi:
			m[sp + 1] = (m[sp + 1] >= m[sp]);
			sp = sp + 1;
			break;
		case dupl:
			sp = sp - 1;
			m[sp] = m[sp + 1];
			break;
		case swap:
			k = m[sp];
			m[sp] = m[sp + 1];
			m[sp + 1] = k;
			break;
		case andb:
			if(m[sp] == 0)
				m[sp + 1] = 0;
			sp = sp + 1;
			break;
		case orb:
			if(m[sp] == 1)
				m[sp + 1] = 1;
			sp = sp + 1;
			break;
		case load:
			m[sp] = m[m[sp]];
			break;
		case stor:
			m[m[sp]] = m[sp + 1];
			sp = sp + 2;
			break;
		case hhalt:
			h = TRUE;
			break;
		case wri:
			/*待定*/
			fprintf(stdout, "%d", m[sp + 1]);
			sp = sp + 2;
			break;
		case wrc:
			fprintf(stdout, "%c", m[sp]);
			sp = sp + 1;
			break;
		case wrl:
			fprintf(stdout, "/n");
			break;
		case rdi:
			fprintf(stdout, "input integer: ");
			fscanf(stdin, "%d", &(m[m[sp]]));
			sp = sp + 1;
			break;
		case rdc:
			fprintf(stdout, "input character: ");
			fscanf(stdin, "%c", &c);
			m[m[sp]] = c;
			sp = sp + 1;
			break;
		case rdl:
			/*待定*/
			break;
		case eol:
			sp = sp - 1;
			m[sp] = feof(stdin);
			break;
		case ldc:
			sp = sp - 1;
			m[sp] = i.a;
			break;
		case ldla:
			sp = sp - 1;
			m[sp] = sp + 1 + i.a;
			break;
		case ldl:
			sp = sp - 1;
			m[sp] = m[sp + 1 + i.a];
			break;
		case ldg:
			sp = sp - 1;
			m[sp] = m[i.a];
			break;
		case stl:
			m[sp + i.a] = m[sp];
			sp = sp + 1;
			break;
		case stg:
			m[i.a] = m[sp];
			sp = sp + 1;
			break;
		case move:
			k = m[sp];
			j = m[sp + 1];
			sp = sp + 2;
			n = i.a;
			do{
				n = n - 1;
				m[k + n] = m[j + n];
			}while(n > 0);
			break;
		case copy:
			j = m[sp];
			n = i.a;
			sp = sp - n + 1;
			do{
				n = n - 1;
				m[sp + n] = m[j + n];
			}while(n > 0);
			break;
		case addc:
			m[sp] = m[sp] + i.a;
			break;
		case mulc:
			m[sp] = m[sp] * i.a;
			break;
		case jump:
			pc = i.a;
			break;
		case jumpz:
			if(m[sp] == 0)
				pc = i.a;
			sp = sp + 1;
			break;
		case call:
			sp = sp - 1;
			m[sp] = pc;
			pc = i.a;
			break;
		case adjs:
			sp = sp + i.a;
			break;
		case sets:
			sp = i.a;
			break;
		case pexit:
			pc = m[sp];
			sp = sp + i.a;
			break;
		}
	}while(h == FALSE);
}
void main(int argc, char **argv)
{
	char filename[81], save;
	FILE * sf;
	memset(filename, 0, 81);
	if(argc == 1)
	{
		fprintf(stdout, "please enter source file name: ");
		fscanf(stdin, "%s", filename);
	}
	else
		strcpy(filename, argv[1]);
	source = fopen(filename, "r");
	if(source == NULL)
	{
		fprintf(stderr, "cann't open file: %s/n", filename);
		return;
	}
	fprintf(stdout, "compiling.../n");
	compile();
	fclose(source);
	fprintf(stdout, "no errors, compile succeed./n");
	fprintf(stdout, "--------------------------------/n");
	listcode(stdout);
	fprintf(stdout, "--------------------------------/n");
	fprintf(stdout, "Run>/n");
	interpret();
	fprintf(stdout, "program exit./n");
	fprintf(stdout, "do you want to save the code(y or n): ");
	do{
		scanf("%c", &save);
	}while(save != 'y' && save != 'Y' && save != 'n' && save != 'N');
	if(save == 'y' || save == 'Y')
	{
		fprintf(stdout, "enter file name: ");
		scanf("%s", filename);
		sf = fopen(filename, "w");
		if(sf)
		{
			listcode(sf);
			fclose(sf);
		}
		else
			fprintf(stdout, "open file error, code not saved./n");
	}
}
发表在 代码片段, 编译原理 | PL0文法编译器C语言源代码已关闭评论

In-Stream Big Data Processing

In-Stream Big Data Processing

 

The shortcomings and drawbacks of batch-oriented data processing were widely recognized by the Big Data community quite a long time ago. It became clear that real-time query processing and in-stream processing is the immediate need in many practical applications. In recent years, this idea got a lot of traction and a whole bunch of solutions like Twitter’s Storm, Yahoo’s S4, Cloudera’s Impala, Apache Spark, and Apache Tez appeared and joined the army of Big Data and NoSQL systems. This article is an effort to explore techniques used by developers of in-stream data processing systems, trace the connections of these techniques to massive batch processing and OLTP/OLAP databases, and discuss how one unified query engine can support in-stream, batch, and OLAP processing at the same time.

At Grid Dynamics, we recently faced a necessity to build an in-stream data processing system that aimed to crunch about 8 billion events daily providing fault-tolerance and strict transactioanlity i.e. none of these events can be lost or duplicated. This system has been designed to supplement and succeed the existing Hadoop-based system that had too high latency of data processing and too high maintenance costs. The requirements and the system itself were so generic and typical that we describe it below as a canonical model, just like an abstract problem statement.

A high-level overview of the environment we worked with is shown in the figure below:

cover-2

One can see that this environment is a typical Big Data installation: there is a set of applications that produce the raw data in multiple datacenters, the data is shipped by means of Data Collection subsystem to HDFS located in the central facility, then the raw data is aggregated and analyzed using the standard Hadoop stack (MapReduce, Pig, Hive) and the aggregated results are stored in HDFS and NoSQL, imported to the OLAP database and accessed by custom user applications. Our goal was to equip all facilities with a new in-stream engine (shown in the bottom of the figure) that processes most intensive data flows and ships the pre-aggregated data to the central facility, thus decreasing the amount of raw data and heavy batch jobs in Hadoop. The design of the in-stream processing engine itself was driven by the following requirements:

  • SQL-like functionality. The engine has to evaluate SQL-like queries continuously, including joins over time windows and different aggregation functions that implement quite complex custom business logic. The engine can also involve relatively static data (admixtures) loaded from the stores of Aggregated Data. Complex multi-pass data mining algorithms are beyond the immediate goals.
  • Modularity and flexibility. It is not to say that one can simply issue a SQL-like query and the corresponding pipeline will be created and deployed automatically, but it should be relatively easy to assemble quite complex data processing chains by linking one block to another.
  • Fault-tolerance. Strict fault-tolerance is a principal requirement for the engine. As it sketched in the bottom part of the figure, one possible design of the engine is to use distributed data processing pipelines that implement operations like joins and aggregations or chains of such operations, and connect these pipelines by means of fault-tolerant persistent buffers. These buffers also improve modularity of the system by enabling publish/subscribe communication style and easy addition/removal of the pipelines. The pipelines can be stateful and the engine’s middleware should provide a persistent storage to enable state checkpointing. All these topics will be discussed in the later sections of the article.
  • Interoperability with Hadoop. The engine should be able to ingest both streaming data and data from Hadoop i.e. serve as a custom query engine atop of HDFS.
  • High performance and mobility. The system should deliver performance of tens of thousands messages per second even on clusters of minimal size. The engine should be compact and efficient, so one can deploy it in multiple datacenters on small clusters.

To find out how such a system can be implemented, we discuss the following topics in the rest of the article:

  • First, we explore relations between in-stream data processing systems, massive batch processing systems, and relational query engines to understand how in-stream processing can leverage a huge number of techniques that were devised for other classes of systems.
  • Second, we describe a number of patterns and techniques that are frequently used in building of in-stream processing frameworks and systems. In addition, we survey the current and emerging technologies and provide a few implementation tips.

The article is based on a research project developed at Grid Dynamics Labs. Much of the credit goes to Alexey Kharlamov and Rafael Bagmanov who led the project and other contributors: Dmitry Suslov, Konstantine Golikov, Evelina Stepanova, Anatoly Vinogradov, Roman Belous, and Varvara Strizhkova.

Basics of Distributed Query Processing

It is clear that distributed in-stream data processing has something to do with query processing in distributed relational databases. Many standard query processing techniques can be employed by in-stream processing engine, so it is extremely useful to understand classical algorithms of distributed query processing and see how it all relates to in-stream processing and other popular paradigms like MapReduce.

Distributed query processing is a very large area of knowledge that was under development for decades, so we start with a brief overview of the main techniques just to provide a context for further discussion.

Partitioning and Shuffling

Distributed and parallel query processing heavily relies on data partitioning to break down a large data set into multiple pieces that can be processed by independent processors. Query processing could consist of multiple steps and each step could require its own partitioning strategy, so data shuffling is an operation frequently performed by distributed databases.

Although optimal partitioning for selection and projection operations can be tricky (e.g. for range queries), we can assume that for in-stream data filtering it is practically enough to distribute data among the processors using a hash-based partitioning.

Processing of distributed joins is not so easy and requires a more thorough examination. In distributed environments, parallelism of join processing is achieved through data partitioning, i.e. the data is distributed among processors and each processor employs a serial join algorithm (e.g. nested-loop join or sort-merge join or hash-based join) to process its part of the data. The final results are consolidated from the results obtained from different processors.

There are two main data partitioning techniques that can be employed by distributed join processing:

  • Disjoint data partitioning
  • Divide and broadcast join

Disjoint data partitioning technique shuffles the data into several partitions in such a way that join keys in different partitions do not overlap. Each processor performs the join operation on each of these partitions and the final result is obtained as a simple concatenation of the results obtained from different processors.  Consider an example where relation R is joined with relation S on a numerical key k and a simple modulo-based hash function is used to produce the partitions (it is assumes that the data initially distributed among the processors based on some other policy):

disjoint-partitioning

The divide and broadcast join algorithm is illustrated in the figure below. This method divides the first data set into multiple disjoint partitions (R1, R2, and R3 in the figure) and replicates the second data set to all processors. In a distributed database, division typically is not a part of the query processing itself because data sets are initially distributed among multiple nodes.

broadcast-join

This strategy is applicable for joining of a large relation with a small relation or two small relations. In-stream data processing systems can employ this technique for stream enrichment i.e. joining a static data (admixture) to a data stream.

Processing of GroupBy queries also relies on shuffling and fundamentally similar to the MapReduce paradigm in its pure form.  Consider an example where the data is grouped by a string key and sum of the numerical values is computed in each group:

group-by-query

In this example, computation consists of two steps: local aggregation and global aggregation. These steps basically correspond to Map and Reduce operations. Local aggregation is optional and raw records can be emitted, shuffled, and aggregated on a global aggregation phase.

The whole point of this section is that all the algorithms above can be naturally implemented using a message passing architectural style i.e. the query execution engine can be considered as a distributed network of nodes connected by the messaging queues. It is conceptually similar to the in-stream processing pipelines.

Pipelining

In the previous section, we noted that many distributed query processing algorithms resemble message passing networks. However, it is not enough to organize efficient in-stream processing: all operators in a query should be chained in such a way that the data flows smoothly through the entire pipeline i.e. neither operation should block processing by waiting for a large piece of input data without producing any output or by writing intermediate results on disk. Some operations like sorting are inherently incompatible with this concept (obviously, a sorting block cannot produce any output until the entire input is ingested), but in many cases pipelining algorithms are applicable.  A typical example of pipelining is shown below:

join-pipeline

In this example, the hash join algorithm is employed to join four relations: R1, S1, S2, and S3 using 3 processors. The idea is to build hash tables for S1, S2 and S3 in parallel and then stream R1 tuples one by one though the pipeline that joins them with S1, S2 and S3 by looking up matches in the hash tables. In-stream processing naturally employs this technique to join a data stream with the static data (admixtures).

In relational databases, join operation can take advantage of pipelining by using the symmetric hash join algorithm or some of its advanced variants [1,2]. Symmetric hash join is a generalization of hash join. Whereas a normal hash join requires at least one of its inputs to be completely available to produce first results (the input is needed to build a hash table), symmetric hash join is able to produce first results immediately. In contrast to the normal hash join, it maintains hash tables for both inputs and populates these tables as tuples arrive:

symmetric-join

As a tuple comes in, the joiner first looks it up in the hash table of the other stream. If match is found, an output tuple is produced. Then the tuple is inserted in its own hash table.

However, it does not make a lot of sense to perform a complete join of infinite streams. In many cases join is performed on a finite time window or other type of buffer e.g. LFU cache that contains most frequent tuples in the stream. Symmetric hash join can be employed if the buffer is large comparing to the stream rate or buffer is flushed frequently according to some application logic or buffer eviction strategy is not predictable. In other cases, simple hash join is often sufficient since the buffer is constantly full and does not block the processing:

stream-join

It is worth noting that in-stream processing often deals with sophisticated stream correlation algorithms where records are matched based on scoring metrics, not on field equality condition. A more complex system of buffers can be required for both streams in such cases.

In-Stream Processing Patterns

In the previous section, we discussed a number of standard query processing techniques that can be used in massively parallel stream processing. Thus, on a conceptual level, an efficient query engine in a distributed database can act as a stream processing system and vice versa, a stream processing system can act as a distributed database query engine. Shuffling and pipelining are the key techniques of distributed query processing and message passing networks can naturally implement them. However, things are not so simple. In a contrast to database query engines where reliability is not critical because a read-only query can always be restarted, streaming systems should pay a lot of attention to reliable events processing. In this section, we discuss a number of techniques that are used by streaming systems to provide message delivery guarantees and some other patterns that are not typical for standard query processing.

Stream Replay

Ability to rewind data stream back in time and replay the data is very important for in-stream processing systems because of the following reasons:

  • This is the only way to guarantee correct data processing. Even if data processing pipeline is fault-tolerant, it is very problematic to guarantee that the deployed processing logic is defect-free. One can always face a necessity to fix and redeploy the system and replay the data on a new version of the pipeline.
  • Issue investigation could require ad hoc queries. If something goes wrong, one could need to rerun the system on the problematic data with better logging or with code alternations.
  • Although it is not always the case, the in-stream processing system can be designed in such a way that it re-reads individual messages from the source in case of processing errors and local failures, even if the system in general is fault-tolerant.

As a result, the input data typically goes from the data source to the in-stream pipeline via a persistent buffer that allows clients to move their reading pointers back and forth.

replay-buffer

Kafka messaging queue is well known implementation of such a buffer that also supports scalable distributed deployments, fault-tolerance, and provides high performance.

As a bottom line, Stream Replay technique imposes the following requirements of the system design:

  • The system is able to store the raw input data for a preconfigured period time.
  • The system is able to revoke a part of the produced results, replay the corresponding input data and produce a new version of the results.
  • The system should work fast enough to rewind the data back in time, replay them, and then catch up with the constantly arriving data stream.

Lineage Tracking

In a streaming system, events flow through a chain of processors until the result reaches the final destination (like an external database). Each input event produces a directed graph of descendant events (lineage) that ends by the final results. To guarantee reliable data processing, it is necessary to ensure that the entire graph was processed successfully and to restart processing in case of failures.

Efficient lineage tracking is not a trivial problem. Let us first consider how Twitter’s Storm tracks the messages to guarantee at-least-once delivery semantics (see the diagram below):

  • All events that emitted by the sources (first nodes in the data processing graph) are marked by a random ID. For each source, the framework maintains a set of pairs [event ID -> signature] for each initial event. The signature is initially initialized by the event ID.
  • Downstream nodes can generate zero or more events based on the received initial event. Each event carries its own random ID and the ID of the initial event.
  • If the event is successfully received and processed by the next node in the graph, this node updates the signature of the corresponding initial event by XORing the signature with (a) ID of the incoming event and (b) IDs of all events produced based on the incoming event. In the part 2 of diagram below, event 01111 produces events 01100, 10010, and 00010, so the signature for event 01111 becomes 11100 (= 01111 (initial value) xor 01111 xor 01100 xor 10010 xor 00010).
  • An event can be produced based on more than one incoming event. In this case, it is attached several initial event and carries more than one initial IDs downstream (yellow-black event in the part 3 of the figure below).
  • The event considered to be successfully processed as soon as its signature turns into zero i.e. the final node acknowledged that the last event in the graph was processed successfully and no events were emitted downstream. The framework sends a commit message to the source node (see part 3 in the diagram below).
  • The framework traverses a table of the initial events periodically looking for old uncommitted events (events with non-zero signature). Such events are considered as failed and the framework asks the source nodes to replay them.
  • It is important to note that the order of signature updates is not important due to commutative nature of the XOR operation. In the figure below, acknowledgements depicted in the part 2 can arrive after acknowledgements depicted in the part 3. This enables fully asynchronous processing.
  • One can note that the algorithm above is not strictly reliable – the signature could turn into zero accidentally due to unfortunate combination of IDs. However, 64-bit IDs are sufficient to guarantee a very low probability of error, about 2^(-64), that is acceptable in almost all practical applications. As result, the table of signatures could have a small memory footprint.lineage-tracking-storm

The described approach is elegant due to its decentrilized nature: nodes act independently sending acknowledgement messages, there is no cental entity that tracks all lineages explicitly. However, it could be difficult to manage transactional processing in this way for flows that maintain sliding windows or other buffers. For example, processing on a sliding window can involve hundreds of thousands events at each moment of time, so it becomes difficult to manage acknowledgements because many events stay uncommitted or computational state should be persisted frequently.

An alternative approach is used in Apache Spark [3]. The idea is to consider the final result as a function of the incoming data. To simplify lineage tracking, the framework processes events in batches, so the result is a sequence of batches where each batch is a function of the input batches. Resulting batches can be computed in parallel and if some computation fails, the framework simply reruns it. Consider an example:

stream-join-microbatching-tx

In this example, the framework joins two streams on a sliding window and then the result passes through one more processing stage. The framework considers the incoming streams not as streams, but as set of batches. Each batch has an ID and the framework can fetch it by the ID at any moment of time. So, stream processing can be represented as a bunch of transactions where each transaction takes a group of input batches, transforms them using a processing function, and persists a result. In the figure above, one of such transactions is highlighted in red. If the transaction fails, the framework simply reruns it. It is important that transactions can be executed in parallel.

This simple but powerful paradigm enables centralized transaction management and inherently provides exactly-once message processing semantics. It is worth noting that this technique can be used both for batch processing and for stream processing because it treats the input data as a set of batches regardless to their streaming of static nature.

State Checkpointing

In the previous section we have considered the lineage tracking algorithm that uses signatures (checksums) to provide at-least-one message delivery semantics. This technique improves reliability of the system, but it leaves at least two major open questions:

  • In many cases, exactly-once processing semantics is required. For example, the pipeline that counts events can produce incorrect results if some messages will be delivered twice.
  • Nodes in the pipeline can have a computational state that is updated as the messages processed. This state can be lost in case of node failure, so it is necessary to persist or replicate it.

Twitter’s Storm addresses these issues by using the following protocol:

  • Events are grouped into batches and each batch is associated with a transaction ID. A transaction ID is a monotonically growing numerical value (e.g. the first batch has ID 1, the second ID 2, and so on). If the pipeline fails to process a batch, this batch is re-emitted with the same transaction ID.
  • First, the framework announces to the nodes in the pipeline that a new transaction attempt is started. Second, the framework to sends the batch through the pipeline. Finally, the framework announces that transaction attempt if completed and all nodes can commit their state e.g. update it in the external database.
  • The framework guarantees that commit phases are globally ordered across all transactions i.e. the transaction 2 can never be committed before the transaction 1. This guarantee enables processing nodes to use following logic of persistent state updates:
    • The latest transaction ID is persisted along with the state.
    • If the framework requests to commit the current transaction with the ID that differs from the ID value persisted in the database, the state can be updated e.g. a counter in the database can be incremented. Assuming a strong ordering of transactions, such update will happen exactly one for each batch.
    • If the current transaction ID equals to the value persisted in the storage, the node skips the commit because this is a batch replay. The node must have processed the batch earlier and updated the state accordingly, but the transaction failed due to an error somewhere else in the pipeline.
    • Strong order of commits is important to achieve exactly-once processing semantics. However, strictly sequential processing of transactions is not feasible because first nodes in the pipeline will often be idle waiting until processing on the downstream nodes is completed. This issues can be alleviated by allowing parallel processing of transactions but serialization of commit steps only, as it shown in the figure below:

pipelining-commits-2

This technique allows one to achieve exactly-once processing semantics assuming that data sources are fault-tolerant and can be replayed. However, persistent state updates can cause serious performance degradation even if large batches are used. By this reason, the intermediate computational state should be minimized or avoided whenever possible.

As a footnote, it is worth mentioning that state writing can be implemented in different ways. The most straightforward approach is to dump in-memory state to the persistent store as part of the transaction commit process. This does not work well for large states (sliding windows an so on). An alternative is to write a kind of transaction log i.e. a sequence of operations that transform the old state into the new one (for a sliding window it can be a set of added and evicted events). This approach complicates crash recovery because the state has to be reconstructed from the log, but can provide performance benefits in a variety of cases.

Additive State and Sketches

Additivity of intermediate and final computational results is an important property that drastically simplifies design, implementation, maintenance, and recovery of in-stream data processing systems. Additivity means that the computational result for a larger time range or a larger data partition can be calculated as a combination of results for smaller time ranges or smaller partitions. For example, a daily number of page views can be calculated as a sum of hourly numbers of page views. Additive state allows one to split processing of a stream into processing of batches that can be computed and re-computed independently and, as we discussed in the previous sections, this helps to simplify lineage tracking and reduce complexity of state maintenance.

It is not always trivial to achieve additivity:

  • In many cases, additivity is indeed trivial. For example, simple counters are additive.
  • In some cases, it is possible to achieve additivity by storing a small amount of additional information. For example, consider a system that calculates average purchase value in the internet shop for each hour. Daily average cannot be obtained from 24 hourly average values. However, the system can easily store a number of transactions along with each hourly average and it is enough to calculate the daily average value.
  • In many cases, it is very difficult or impossible to achieve additivity. For example, consider a system that counts unique visitors on some internet site. If 100 unique users visited the site yesterday and 100 unique user visited the site today, the total number of unique user for two days can be from 100 to 200 depends on how many users visited the site both yesterday and today. One have to maintain lists of user IDs to achieve additivity through intersection/union of the ID lists. Size and processing complexity for these lists can be comparable to the size and processing complexity of the raw data.

Sketches is a very efficient way to transform non-additive values into additive. In the previous example, lists of ID can be replaced by compact additive statistical counters. These counters provide approximations instead of precise result, but it is acceptable for many practical applications. Sketches are very popular in certain areas like internet advertising and can be considered as an independent pattern of in-stream processing. A thorough overview of the sketching techniques can be found in [5].

Logical Time Tracking

It is very common for in-stream computations to depend on time: aggregations and joins are often performed on sliding time windows; processing logic often depends on a time interval between events and so on. Obviously, the in-stream processing system should have a notion of application’s view of time, instead of CPU wall-clock. However, proper time tracking is not trivial because data streams and particular events can be replayed in case of failures. It is often a good idea to have a notion of global logical time that can be implemented as follows:

  • All events should be marked with a timestamp generated by the original application.
  • Each processor in a pipeline tracks the maximal timestamp it has seen in a stream and updates a global persistent clock by this timestamp if the global clock is behind. All other processors synchronize their time with the global clock.
  • Global clock can be reset in case of data replay.

Aggregation in a Persistent Store

We already have discussed that persistent store can be used for state checkpointing. However, it not the only way to employ an external store for in-stream processing. Let us consider an example that employs Cassandra to join multiple data streams over a time window. Instead of maintaining in-memory event buffers, one can simply save all incoming events from all data streams to Casandra using a join key as row key, as it shown in the figure below:

cassandra-join

On the other side, the second process traverses the records periodically, assembles and emits joined events, and evicts the events that fell out of the time window. Cassandra even can facilitate this activity by sorting events according to their timestamps.

It is important to understand that such techniques can defeat the whole purpose of in-stream data processing if implemented incorrectly – writing individual events to the data store can introduce a serious performance bottleneck even for fast stores like Cassandra or Redis. On the other hand, this approach provides perfect persistence of the computational state and different performance optimizations – say, batch writes – can help to achieve acceptable performance in many use cases.

Aggregation on a Sliding Window

In-stream data processing frequently deals with queries like “What is the sum of the values in the stream over last 10 minutes?” i.e. with continuous queries on a sliding time window. A straightforward approach to processing of such queries is to compute the aggregation function like sum for each instance of the time window independently. It is clear that this approach is not optimal because of the high similarity between two sequential instances of the time window. If the window at the time T contains samples {s(0), s(1), s(2), …, s(T-1), s(T)}, then the window at the time T+1 contains samples {s(1), s(2), s(3), …, s(T), s(T+1)}. This observation suggests that incremental processing might be used.

Incremental computations over sliding windows is a group of techniques that are widely used in digital signal processing, in both software and hardware. A typical example is a computation of the sum function. If the sum over the current time window is known, then the sum over the next time window can be computed by adding a new sample and subtracting the eldest sample in the window:

inremental-aggregation

Similar techniques exist not only for simple aggregations like sums or products, but also for more complex transformations. For example, the SDFT (Sliding Discreet Fourier Transform) algorithm [4] is a computationally efficient alternative to per-window calculation of the FFT (Fast Fourier Transform) algorithm.

Query Processing Pipeline: Storm, Cassandra, Kafka

Now let us return to the practical problem that was stated in the beginning of this article. We have designed and implemented our in-stream data processing system on top of Storm, Kafka, and Cassandra adopting the techniques described earlier in this article. Here we provide just a very brief overview of the solution – a detailed description of all implementation pitfalls and tricks is too large and probably requires a separate article.

storm-kafka-cassandra-system

The system naturally uses Kafka 0.8 as a partitioned fault-tolerant event buffer to enable stream replay and improve system extensibility by easy addition of new event producers and consumers. Kafka’s ability to rewind read pointers also enables random access to the incoming batches and, consequently, Spark-style lineage tracking. It is also possible to point the system input to HDFS to process the historical data.

Cassandra is employed for state checkpointing and in-store aggregation, as described earlier. In many use cases, it also stores the final results.

Twitter’s Storm is a backbone of the system. All active query processing is performed in Storm’s topologies that interact with Kafka and Cassandra. Some data flows are simple and straightforward: the data arrives to Kafka; Storm reads and processes it and persist the results to Cassandra or other destination. Other flows are more sophisticated: one Storm topology can pass the data to another topology via Kafka or Cassandra. Two examples of such flows are shown in the figure above (red and blue curved arrows).

Towards Unified Big Data Processing

It is great that the existing technologies like Hive, Storm, and Impala enable us to crunch Big Data using both batch processing for complex analytics and machine learning, and real-time query processing for online analytics, and in-stream processing for continuous querying. Moreover, techniques like Lambda Architecture [6, 7] were developed and adopted to combine these solutions efficiently. This brings us to the question of how all these technologies and approaches could converge to a solid solution in the future.  In this section, we discuss the striking similarity between distributed relational query processing, batch processing, and in-stream query processing to figure out the technologies that could cover all these use cases and, consequently, have the highest potential in this area.

The key observation is that relational query processing, MapReduce, and in-stream processing could be implemented using exactly the same concepts and techniques like shuffling and pipelining. At the same time:

  • In-stream processing could require strict data delivery guarantees and persistence of the intermediate state. These properties are not crucial for batch processing where computations can be easily restarted.
  • In-stream processing is inseparable from pipelining. For batch processing, pipelining is not so crucial and even inapplicable in certain cases. Systems like Apache Hive are based on staged MapReduce with materialization of the intermediate state and do not take full advantage of pipelining.

The two statement above imply that tunable persistence (in-memory message passing versus on-disk materialization) and reliability are the distinctive features of the imaginary query engine that provides a set of processing primitives and interfaces to the high-level frameworks:

unified-engine

Among the emerging technologies, the following two are especially notable in the context of this discussion:

  • Apache Tez [8], a part of the Stinger Initiative [9]. Apache Tez is designed to succeed the MapReduce framework introducing a set of fine-grained query processing primitives. The goal is to enable frameworks like Apache Pig and Apache Hive to decompose their queries and scripts into efficient query processing pipelines instead of sequences of MapReduce jobs that are generally slow due to materialization of intermediate results.
  • Apache Spark [10]. This project is probably the most advanced and promising technology for unified Big Data processing that already includes a batch processing framework, SQL query engine, and a stream processing framework.

References

  1. A. Wilschut and P. Apers, “Dataflow Query Execution in a Parallel Main-Memory Environment “
  2. T. Urhan and M. Franklin, “XJoin: A Reactively-Scheduled Pipelined Join Operator“
  3. M. Zaharia, T. Das, H. Li, S. Shenker, and I. Stoica, “Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters”
  4. E. Jacobsen and R. Lyons, “The Sliding DFT“
  5. A. Elmagarmid, Data Streams Models and Algorithms
  6. N. Marz, “Big Data Lambda Architecture”
  7. J. Kinley, “The Lambda architecture: principles for architecting realtime Big Data systems”
  8. http://hortonworks.com/hadoop/tez/
  9. http://hortonworks.com/stinger/
  10. http://spark-project.org/
发表在 实时数据处理 | In-Stream Big Data Processing已关闭评论

布隆过滤器(Bloom Filter)

在日常生活中,包括在设计计算机软件时,我们经常要判断一个元素是否在一个集合中。比如在字处理软件中,需要检查一个英语单词是否拼写正确(也就是要判断它是否在已知的字典中);在 FBI,一个嫌疑人的名字是否已经在嫌疑名单上;在网络爬虫里,一个网址是否被访问过等等。最直接的方法就是将集合中全部的元素存在计算机中,遇到一个新元素时,将它和集合中的元素直接比较即可。一般来讲,计算机中的集合是用哈希表(hash table)来存储的。它的好处是快速准确,缺点是费存储空间。当集合比较小时,这个问题不显著,但是当集合巨大时,哈希表存储效率低的问题就显现出来了。比如说,一个象 Yahoo,Hotmail 和 Gmai 那样的公众电子邮件(email)提供商,总是需要过滤来自发送垃圾邮件的人(spamer)的垃圾邮件。一个办法就是记录下那些发垃圾邮件的 email 地址。由于那些发送者不停地在注册新的地址,全世界少说也有几十亿个发垃圾邮件的地址,将他们都存起来则需要大量的网络服务器。如果用哈希表,每存储一亿个 email 地址, 就需要 1.6GB 的内存(用哈希表实现的具体办法是将每一个 email 地址对应成一个八字节的信息指纹googlechinablog.com/2006/08/blog-post.html,然后将这些信息指纹存入哈希表,由于哈希表的存储效率一般只有 50%,因此一个 email 地址需要占用十六个字节。一亿个地址大约要 1.6GB, 即十六亿字节的内存)。因此存贮几十亿个邮件地址可能需要上百 GB 的内存。除非是超级计算机,一般服务器是无法存储的。

今天,我们介绍一种称作布隆过滤器的数学工具,它只需要哈希表 1/8 到 1/4 的大小就能解决同样的问题。

布隆过滤器是由巴顿.布隆于一九七零年提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。我们通过上面的例子来说明起工作原理。

假定我们存储一亿个电子邮件地址,我们先建立一个十六亿二进制(比特),即两亿字节的向量,然后将这十六亿个二进制全部设置为零。对于每一个电子邮件地址 X,我们用八个不同的随机数产生器(F1,F2, …,F8) 产生八个信息指纹(f1, f2, …, f8)。再用一个随机数产生器 G 把这八个信息指纹映射到 1 到十六亿中的八个自然数 g1, g2, …,g8。现在我们把这八个位置的二进制全部设置为一。当我们对这一亿个 email 地址都进行这样的处理后。一个针对这些 email 地址的布隆过滤器就建成了。(见下图)

现在,让我们看看如何用布隆过滤器来检测一个可疑的电子邮件地址 Y 是否在黑名单中。我们用相同的八个随机数产生器(F1, F2, …, F8)对这个地址产生八个信息指纹 s1,s2,…,s8,然后将这八个指纹对应到布隆过滤器的八个二进制位,分别是 t1,t2,…,t8。如果 Y 在黑名单中,显然,t1,t2,..,t8 对应的八个二进制一定是一。这样在遇到任何在黑名单中的电子邮件地址,我们都能准确地发现。

布隆过滤器决不会漏掉任何一个在黑名单中的可疑地址。但是,它有一条不足之处。也就是它有极小的可能将一个不在黑名单中的电子邮件地址判定为在黑名单中,因为有可能某个好的邮件地址正巧对应个八个都被设置成一的二进制位。好在这种可能性很小。我们把它称为误识概率。在上面的例子中,误识概率在万分之一以下。

布隆过滤器的好处在于快速,省空间。但是有一定的误识别率。常见的补救办法是在建立一个小的白名单,存储那些可能别误判的邮件地址。

发表在 代码片段, 算法 | 布隆过滤器(Bloom Filter)已关闭评论