spark apache日志分析、流数据处理教程

所需积分/C币:50 2015-02-06 22:28:32 556KB PDF
收藏 收藏 3
举报

Databricks Spark Reference Applications spar日志分析、流数据处理 java8代码
Databricks Reference Apps At Databricks, we are developing a set of reference applications that demonstrate how to use Apache Spark. This book/repo contains the reference applications ViewthecodeintheGithubRepoherehttps:/github.com/databricks/reference-apps .Readthedocumentationherehttp://databricks.gitbooks.io/databricks-spark-reference-applications .Submitfeedbackorissuesherehttps://github.com/databricks/reference-apps/issues The reference applications will appeal to those who want to learn Spark and learn better by example. Browse the applications, see what features of the reference applications are similar to the features you want to build, and refashion the code samples for your needs. Additionally, this is meant to be a practical guide for using spark in your systems, so the applications mention other technologies that are compatible with Spark- such as what file systems to use for storing your massive data sets Log Analysis Application-The log analysis reference application contains a series of tutorials for learning Spark by example as well as a final application that can be used to monitor Apache access logs. The examples use Spark in batch mode, cover Spark SQL, as well as spark Streaming Twitter Streaming Language Classifier- This application demonstrates how to fetch and train a language classifier for Tweets using Spark MLLib Then Spark Streaming is used to call the trained classifier and filter out live tweets that match a specified cluster To build this example go into the twitter classifier/scala and follow the direction in the README This reference app is covered by license terms covered here Log Analysis with Spark This project demonstrates how easy it is to do log analysis with Apache Spark Log analysis is an ideal use case for Spark It's a very large, common data source and contains a rich set of information Spark allows you to store your logs in files to disk cheaply, while still providing a quick and simple way to process them We hope this project will show you how to use Apache Spark on your organization s production logs and fully harness the power of that data Log data can be used for monitoring your servers, improving business and customer intelligence, building recommendation systems, preventing fraud, and much more How to use this project This project is broken up into sections with bite-sized examples for demonstrating new Spark functionality for log processing. This makes the examples easy to run and learn as they cover just one new topic at a time. At the end, we assemble some of these examples to form a sample log analysis application Section 1: Introduction to Apache Spark The Apache Spark library is introduced, as well as Spark SQL and spark Streaming. By the end of this chapter, a read will know how to call transformations and actions and work with rdds and streams Section 2: Importing Data This section includes examples to illustrate how to get data into Spark and starts covering concepts of distributed computing The examples are all suitable for datasets that are too large to be processed on one machine Section 3: Exporting Data This section includes examples to illustrate how to get data out of Spark. Again, concepts of a distributed computing environment are reinforced, and the examples are suitable for large datasets Section 4: Logs Analyzer Application This section puts together some of the code in the other chapters to form a sample log analysis application More to come While that's all for now, there's definitely more to come over time Section 1: Introduction to Apache Spark In this section, we demonstrate how simple it is to analyze web logs using Apache Spark. We'll show how to load a Resilient Distributed Dataset(RDD)of access log lines and use Spark tranformations and actions to compute some statistics for web server monitoring. In the process, we'll introduce the Spark SQL and the Spark Streaming libraries In this explanation, the code snippets are in Java 8. However, there is also sample code in Java 6, Scala, and Python included in this directory. In those folders are README's for instructions on how to build and run those examples, and the necessary build files with all the required dependencies This chapter covers the following topics 1. First Log Analyzer in Spark This is a first Spark standalone logs analysis application 2. Spark SQL- This example does the same thing as the above example, but uses sQL syntax instead of Spark transformations and actions 3. Spark Streaming- This example covers how to calculate log statistics using the streaming library First Logs Analyzer in Spark Before beginning this section, go through Spark Quick Start and familiarize with the Spark Programming Guide first This section requires a dependency on the Spark Core library in the maven file -note update this dependency based on the version of Spark you have installed <dependency> <!-spark--> <groupld> org. apache. spark</groupld> sartifactld>spark-core 2 10</artif ctld> <version>1.1.0</version> </dependency> Before we can begin, we need two things: An Apache access log file: If you have one, it's more interesting to use real data o This is trivial sample ane provided at data/apache access log oordownloadabetterexampleherehttp://www.monitorware.com/en/logsamples/apachephp A parser and model for the log file: See Apache Accesslog java The example code uses an Apache access log file since thats a well known and common log format. It would be easy to rewrite the parser for a different log format if you have data in another log format The following statistics will be computed The average, min, and max content size of responses returned from the server. A count of response cade's returned All iPAddresses that have accessed this server more than n times The top endpoints requested by count. Let's first walk through the code first before running the example at Log Analyzer, java The main body of a simple Spark application is below. The first step is to bring up a spark context. Then the spark context can load data from a text file as an RDD, which it can then process. Finally, before exiting the function, the spark context is stopped public class Log Analyzer i public static void maIn(Stringll args)i // Create a Spark Context. Spark Confconf new SparkConf(). setAppName(Log Analyzer") Java SparkContext sc new Java SparkContext(conf) l Load the text file into Spark. if (args length ==0)t System. out printIn("Must specify an access logs file. ) System. exit(-1) String logFile args[o] Java RDD< String logLines = 5c. text File(log File /TODO: Insert code here for processing logs. sc stop( Given an RDD of log lines, use the map function to transform each line to an Apache AccessLog object The Apache AccessLog RDD is cached in memory, since multiple transformations and actions will be called on it i/ Convert the text log lines to Apa cheAccessLog objects and l/ cache them since multiple transformations and actions i/ will be called on the data Java RDD<ApacheAccesslog> accesslogs logLines map(ApacheAccessLog: parse FromLoguine) cache(): It's useful to define a sum reducer-this is a function that takes in two integers and returns their sum. This is used all over our example private static Function2<Long, Long, Long> SUM_REDUCER=(a, b)->a+ b: Next, lets calculate the average, minimum, and maximum content size of the response returned. A map transformation extracts the content sizes, and then different actions(reduce, count, min, and max ) are called to output various stats Again, call cache on the context size RDd to avoid recalculating those values for each action called on it. Calculate statistics based on the content size l/ Note how the contentsizes are cached as well since multiple actions ∥ are called on that rdd. Java RDD<Long> contentsizes accessLogs. ma p(Apa cheAccessLog: getContentsize) cache() System. out. printIn( String. format("Content Size Avg: %s, Min: %S, Max: %S" contentsizes. reduce(sUM REDUCER)/content sizes. count() content sizes. min( Comparator. natural()) contentsizes. max( Comparator natura oRder())); To compute the response code counts, we have to work with key-value pairs- by using map To Pair and reduce Bykey Notice that we call take(100) instead of collect() to gather the final output of the response code counts. Use extreme caution before calling collect() on an RDD since all that data will be sent to a single Spark driver and can cause the driver to run out of memory. Even in this case where there are only a limited number of response codes and it seems safe if there are malformed lines in the Apache access log or a bug in the parser, there could be many invalid response codes to cause an i/ Compute Response Code to Count List<Tuple2<Integer, Long>> response Code To Count=accessLogs mapToPair(log -> new Tuple2<>(log. get Response Code(), IL)) reduce ByKey(SUM REDUCER System. out. printIn( String. format("Response code counts: %s", response Code To Count) To compute any IP Address that has accessed this server more than 10 times, we call the filter tranformation and then map to retrieve only the iPAddress and discard the count Again we use take(100) to retrieve the values. List<String> ipAddresses access Logs. map To Pair(log-> new Tuple2<>(log. getlpAddress(), 1L)) reduce ByKey (SUM REDUCER filter(tuple -> tuple. 2()>10) map(Tup|e2∷1) ta ke (100) System. out. println(String. format("IPAddresses 10 times: %s", ipAddresses) Last, lets calculate the top endpoints requested in this log file. We define an inner class, Value Comparator to help with that. This function tells us, given two tuples, which one is first in ordering. The key of the tuple is ignored, and ordering is based just on the values private static class Value Comparator<K, Vs mplements Comparator< Tuple<k, v>>, Serializable I private Comparator<Vs comparator public Value Comparator( Comparator<v> comparator) this.comparator=comparator; @override public int compare (Tuple2<k, v> ol, Tuple2<K, V> 02)t return comparator. compare(ol. 20, 02. 20): Then, we can use the value Comparator with the top action to compute the top endpoints accessed on this server according to how many times the endpoint was accessed List<Tuple< String, Long>> top Endpoints= accessLogs mapToPair(log-> new Tuple<>(log getEndpoint(, 1L)) System. out. printIn(" Top Endpoints: " top Endpoints) These code snippets are from Log Analyzer java. Now that we've walked through the code, try running that example. See the readme for language specific instructions for building and running Spark SQL You should go through the Spark SQL Guide before beginning this section This section requires an additioal dependency on Spark sQ <dependency <!-Spark SOL-> <groupld>org. apache. spark</groupld> cartifactld>spark-sgl 2 10<jartifactld> < version>l 1.0</version> </dependency> For those of you who are familiar with SQL, the same statistics we calculated in the previous example can be done using Spark SQL rather than caling Spark transformations and actions directly. We walk through how to do that here First, we need to create a SQL Spark context. Note how we create one Spark Context, and then use that to instantiate different flavors of Spark contexts. You should not initialize multiple Spark contexts from the Spark Conf in one process public class LogAnalyzerSoL t public static void main(Stringl] args)t he spark SparkConf conf new SparkConf().setAppName(Log Analyzer SQL) Java sparkContext sc= new Java Spark Context( conf); Java SQLContext sqlContext =new Java SQLContext(sc) if (args length ==0)t System. out. println("Must specify an access logs file. ) System. exit(-1) String logFile args[o] Java RDd< Apa cheaccesslog> accesslo gs=sc text File(logFile) map(Apache AccessLog: parse FromLoguine l/ TODO: Insert code for computing log stats scstop(; Next, we need a way to register our logs data into a table In Java, Spark SQL can infer the table schema on a standard Java PoJ0-with getters and setters as we've done with ApacheAccessLog java ( Note: if you are using a different language besides Java, there is a different way for Spark to infer the table schema. The examples in this directory work out of the box. Or you can also refer to the Spark SQL Guide on Data Sources for more details. Java SchemaRDD schemaRDD= sql Context. applyS chema(accessLogs pacheAccessLog. class); schema RDD register TempTable(logs") sqlContext sqlContext().cacheAble("logs") Now, we are ready to start running some SQL queries on our table. Here's the code to compute the identical statistics in the previous section- it should look very familiar for those of you who know SQL i/ Cal culate statistics based on the content size Tuple4<Long, Long, Long, Long> contentsize stats sqlContext sql("SELECT SUM(contentSize), COUNT( ), MIN(content Size), MAX(content Size) FROM logs") map(row-> new Tuple4<>(row. getLong(o), row.getLong(1), row.getLong(2), row. getLong (3))) first System. out. printIn( String. format("Content Size Avg: os, Min: %S, Max: %s contentsize stats. 10/ contentsize stats. 20) content size stats. 3( contentsize Stats. 4()); i/ Compute Response Code to Count i/ Note the use of "LIMIT 1000" since the number of response Codes l/ can potentially be too large to fit in memory List<Tuple2<Integer, Long>> responseCodeToCount = sqlContext sql( SELECT response Code, COUNT()FROM logs GROUP BY response Code LIMIT 1000") mapToPair(row-> new Tuple<>(row.getInt(o), row.getLong(1))): ystem out printIn( String. format("Response code counts: %s", response Code ToCount)) collect(; // Any I PAddress that has accessed the server more than 10 times List< String> ipAddresses sqlContext ql("'SELECT ipAddress, COUNT(**)AS total FROM logs GROUP BY ipAddress HAVING total >10 LIMIT 100") ap(row-> row.getstring(o)) collecto System. out. printIn(String. format("IPAddresses 10 times: %s", ipAddresses) ∥ IOp Endpoints List<Tuple2< string, Long>> topEndpoints sql xt . sql( SELECT endpoint, COUNT(*)AS total FROM logs GROUP BY endpoint ORDER BY total DESC LIMIT 10") nap(row-> new Tuple2<>(row. getString(o), row. getLong (1))) collect() System. out. printIn( String. format("Top Endpoints: %s", to points) Note that the default SQL dialect does not allow using reserved keyworks as alias names. In other words, SELECT COUNT(*) AS count will cause errors, but SELECT COUNT(*)AS the count runs fine. If you use the Hive qL parser though, then you should be able to use anything as an identi fie Try running LogAnalyzersQL java now

...展开详情
试读 38P spark apache日志分析、流数据处理教程
立即下载 低至0.43元/次 身份认证VIP会员低至7折
    一个资源只可评论一次,评论内容不能少于5个字
    转角520 还可以吧,只不过是英文
    2017-10-19
    回复
    圆圆一直在修炼 不错!很有帮助!感谢!
    2016-10-10
    回复
    lspanchong 很好,有帮助。
    2016-03-24
    回复
    Captain_Karen 非常棒!讲的蛮系统的,从简单的例子入手,很有参考价值
    2015-12-21
    回复
    xiao_yi_shao 还不错~挺实用的!能跑起来的!
    2015-05-12
    回复
    cooldatabase1 不错,还用到了可视化
    2015-04-28
    回复
    关注 私信 TA的资源
    上传资源赚积分,得勋章
    最新推荐
    spark apache日志分析、流数据处理教程 50积分/C币 立即下载
    1/38
    spark apache日志分析、流数据处理教程第1页
    spark apache日志分析、流数据处理教程第2页
    spark apache日志分析、流数据处理教程第3页
    spark apache日志分析、流数据处理教程第4页
    spark apache日志分析、流数据处理教程第5页
    spark apache日志分析、流数据处理教程第6页
    spark apache日志分析、流数据处理教程第7页
    spark apache日志分析、流数据处理教程第8页
    spark apache日志分析、流数据处理教程第9页
    spark apache日志分析、流数据处理教程第10页
    spark apache日志分析、流数据处理教程第11页
    spark apache日志分析、流数据处理教程第12页

    试读已结束,剩余26页未读...

    50积分/C币 立即下载 >