Archive for spark

Spark也是一个集群计算系统,提供Python,Java,Scala,R语言的高级API进行数据操作。Spark有各种优点,请自行搜索。 一、下载安装spark 下载编译好的二进制版本,目前还用不到hadoop选择第一个安装。 wget http://mirrors.cnnic.cn/apache/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz 解压 tar zxvf spark-1.5.2-bin-hadoop2.6.tgz 使用默认参数进入python交互模式。启动参数可以指定Spark集群的地址,处理的线程数等值。 park-1.5.2-bin-hadoop2.6/bin/pyspark 交互模式下,Spark会默认给你启动一个SparkContext,名字为sc。你可以执行以下命令查看版本和应用名字。 >>> sc.version u'1.5.2' >>> sc.appName u'PySparkShell' 二、分析nginx log文件 在交互模式下输入,创建一个RDD,textFile会按行读入一个文件,这种方式读入的文件,不会随文件的更新数据进行更新。 第二篇将进行脚本的编写和使用Stream进行实时数据的统计。 >>>log=sc.textFile('./app_access.log') 查看文档有多少行,对应nginx收到多少请求。count是一个action,返回值。 >>> log.count() 2186762 返回文档第一行 >>> log.first() u'117.136.40.185 [11/Nov/2015:06:25:52 +0800] "POST /zmw/v2/favorite_status HTTP/1.1" 200 0.013 61 "-" "%E6%B2%B3%E7%8B%B8%E5%AE%B6/2891 CFNetwork/758.1.6 Darwin/15.0.0" "-" 0.013 Upstream:"10.0.10.135:8700"' 查看IP为"117.136.40.185"的访问次数。filter是一个transformations,返回一个新的RDD,然后对这个新的rdd进行count求值 >>> log.filter(lambda line: "117.136.40.185" == line.split()[0]).count() 637 可以使用cache操作将经常使用RDD尽可能的存放在内存中,加快计算速度。 >>> log.cache() 统计每个ip的访问次数,还可以统计每个url的访问次数,HTTP status code分别的访问次数等相关类似统计 >>> ips = log.map(lambda line: (line.split()[0], 1)).reduceByKey(lambda a, b: a + b) >>> ips PythonRDD[16] at RDD at PythonRDD.scala:43 >>> ips.collect() 验证之前统计过的,数据是否一致 >>> b=ips.filter(lambda a: a[0]=="117.136.40.185") >>> b.collect() [(u'117.136.40.185', 637)] 三、监控 可以从WEB访问,查看job执行情况和统计数据 http://172.16.117.0:4040/jobs/

Continue