之前在网上看到某公司新招实习生的第一次作业是写 JSON Parser,好像之后还要写 Scheme 的 Parser,就自己也想试试。因为并不是工作任务,所以也没去查任何资料,准备自己憋。
但毕竟非科班,也完全没接触过语言或编译的内容,一上来完全摸不到头脑。于是决定先实现一个 json.dumps
找找规律。dumps 很好写,只要递归判断类型然后序列化就好了。于是想 loads
是不是也类似呢?
第一版实现使用了 str.split(',')
方法来分隔集合元素,后来发现一旦嵌套就不好使了,比如 "[1, [2, 3]]"
会被处理成 "1"
、"[2"
和 "3]"
。之后为了简化实现,把处理的元素类型限制在整数和数组。因为这是两种典型的标量和集合类型,其他类型的处理是类似的,只需要横向扩展判断逻辑即可。即写这个 Parser 的主要难题应该还是在于对嵌套集合类型的正确处理。
思考过程如下:
- 已知简单的按逗号分隔行不通,因为会误伤子集合类型的元素
- 若以
dumps
的逆向过程来理解的话,我们需要首先在顶层分割一个数组,然后递归处理其元素 - 既然是递归处理,在分割顶层元素时就不用考虑反序列化,只是分割成字符串即可
- 为了正确分割顶层元素,我需要能够区分字符所处的深度
即对于上例的 "[1, [2, 3]]"
,我需要首先把它分成 "1"
和 "[2, 3]"
,然后递归。简单实现的代码如下:
#lang pythonimport redef loads(s): if not s: raise ValueError() if not isinstance(s, str): raise TypeError() if is_list(s): return list(map(loads, split_list(s[1: -1]))) else: return int(s)def is_list(s): if s[0] == '[': assert s[-1] == ']' return True return Falsedef split_list(sub): elements = [] depth = 0 start, offset = 0, 0 while start < len(sub): if sub[start] in (',', ' '): start += 1 elif sub[start].isdigit(): offset = re.match('\d+(\.\d+)?', sub[start:]).end() elements.append(sub[start: start + offset]) start += offset offset = 0 elif sub[start] == '[': for char in sub[start:]: offset += 1 if char == '[': depth += 1 elif char == ']': depth -= 1 if depth == 0: break if depth == 0: elements.append(sub[start: start + offset]) start += offset offset = 0 else: raise ValueError() return elements
这个实现能够工作,但是性能很差,容易发现对于嵌套对象,读的遍数等于他的深度+1,理想情况下应该是一遍就处理完。且O(n)的遍历还有一个好处就是能够尽早的发现错误,比如如果是 [1, [2, *]]
这样的一个串,我只能在第二次(递归)调用 loads
时才能发现错误。
所以我决定去看一下官方的实现。发现官方的思路是这样的:
- 从前向后遍历 JSON 串,通过遇到的首字符判断需要调用的 parse 函数,含
parse_string
parse_object
parse_array
parse_float
parse_int
parse_constant
(null, true, false, NaN, Infinity, -Infinity)
- 每个 parse 函数接收原始 string 和 当前索引偏移量 end 为参数,返回处理完的对象和新的偏移量。
parse_array
和parse_object
会递归调用任何 parse 函数。
这样就是我之前想要却没写出来的效果——边读边做深度 parse。
添加注释后的 parse_array
如下:
(scan_once
即为上述第一步里的判断函数)
def JSONArray(s_and_end, scan_once, _w=WHITESPACE.match, _ws=WHITESPACE_STR): s, end = s_and_end # s 是原始 JSON 串,end 是当前偏移量 values = [] nextchar = s[end:end + 1] if nextchar in _ws: # 处理空白字符,找到有效的 nextchar end = _w(s, end + 1).end() nextchar = s[end:end + 1] # Look-ahead for trivial empty array if nextchar == ']': # 判断空数组 return values, end + 1 _append = values.append # 不懂为啥一定要把 append 单拎出来 while True: # 一个一个的处理 array 内的元素 try: value, end = scan_once(s, end) # 判断并调用 parse 函数的函数,即递归入口 except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) _append(value) # 处理完毕,添加到 value 里 nextchar = s[end:end + 1] if nextchar in _ws: # 重复之前的内容——跳过空白符 end = _w(s, end + 1).end() nextchar = s[end:end + 1] end += 1 if nextchar == ']': # 判断结束 break elif nextchar != ',': raise ValueError(errmsg("Expecting ',' delimiter", s, end)) try: # 不太懂为啥要判断这么多次空白符 if s[end] in _ws: end += 1 if s[end] in _ws: end = _w(s, end + 1).end() except IndexError: pass return values, end
最后,因为这是个递归过程,所以默认状态下它的处理深度是有上限的,而且速度不算快。如果有高性能需求的应用,最好去寻找下第三方模块。或者感兴趣的人也可以自己实现一下,思路和官方差不多,改成循环或者开启尾递归优化即可。