.NET for Spark可用于处理成批数据、实时流、机器学习和ad-hoc查询。在这篇博客文章中,我们将探讨如何使用.NET for Spark执行一个非常流行的大数据任务,即日志分析。
1 什么是日志分析?
日志分析的目标是从这些日志中获得有关工具或服务的活动和性能的有意义的见解。NET for Spark使我们能够快速高效地分析从兆字节到千兆字节的日志数据!
在这篇文章中,我们将分析一组Apache日志条目,这些条目表示用户如何与web服务器上的内容交互。您可以在这里查看Apache日志条目的示例。
2 编写一个应用
日志分析是Spark批量处理的一个例子。批处理是静态数据的转换,意味着源数据已经加载到数据存储中。在我们的例子中,输入文本文件已经填充了日志,并且在处理时不会接收新的或更新的日志。
在为Spark应用程序创建新的.NET时,我们只需要遵循以下几个步骤,就可以开始从我们的数据中获得这些有趣的见解:
-
-
- 创建Spark会话
- 读取输入数据,通常使用DataFrame
- 操作和分析输入数据,通常使用Spark SQL
-
2.1 创建Spark会话
在任何Spark应用程序中,我们首先建立一个新的SparkSession,它是使用Spark编程的入口点:
SparkSession spark = SparkSession .Builder() .AppName("Apache User Log Processing") .GetOrCreate();
2.2 读取输入数据
我们将输入数据存储在DataFrame中,DataFrame是数据的分布式集合,它把数据组织为命名列的集合:
DataFrame generalDf = spark.Read().Text("<path to input data set>");
2.3 操纵和分析输入的数据
重要的第一步是数据准备。数据准备包括以某种方式清理我们的数据。这可能包括删除不完整的条目以避免以后计算中出现错误,或者删除不相关的输入以提高性能。
在我们的示例中,我们应该首先确保所有条目都是完整的日志。我们可以通过将每个日志条目与一个正则表达式进行匹配来实现这一点。
string s_apacheRx = "^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) (d+)";
我们如何对DataFrame的每一行执行计算,比如将每个日志条目与上面的s_apacheRx进行匹配?答案是Spark SQL。
2.4 Spark SQL
Spark SQL为处理DataFrame中存储的结构化数据提供了许多很棒的函数。Spark SQL最流行的特性之一是UDF(用户自定义函数)。我们定义它们接受的输入类型和产生的输出类型,然后定义它们如何执行计算或筛选。
让我们定义一个新的UDF GeneralReg,将每个日志条目与s_apacheRx 进行匹配。我们的UDF需要一个Apache日志条目,它是一个字符串,并根据日志是否与s_apacheRx匹配返回true或false:
spark.Udf().Register<string, bool>("GeneralReg", log => Regex.IsMatch(log, s_apacheRx));
除了UDF之外,Spark SQL还提供了编写SQL调用来分析我们的数据的能力,通常编写一个SQL调用来将UDF应用于每一行数据:
DataFrame generalDf = spark.Sql("SELECT logs.value, GeneralReg(logs.value) FROM Logs");
这个SQL调用测试generalDf的每一行,以确定它是否是一个有效且完整的日志。
我们可以使用.Filter()只在数据中保留完整的日志条目,然后使用.Show()显示新筛选的DataFrame:
generalDf = generalDf.Filter(generalDf["GeneralReg(value)"]); generalDf.Show();
现在我们已经完成了一些初始数据准备,我们可以继续过滤和分析我们的数据。让我们从以10开头的IP地址中查找与垃圾邮件相关的日志条目:
// Choose valid log entries that start with 10 spark.Udf().Register<string, bool>( "IPReg", log => Regex.IsMatch(log, "^(?=10)")); generalDf.CreateOrReplaceTempView("IPLogs"); // Apply UDF to get valid log entries starting with 10 DataFrame ipDf = spark.Sql( "SELECT iplogs.value FROM IPLogs WHERE IPReg(iplogs.value)"); ipDf.Show(); // Choose valid log entries that start with 10 and deal with spam spark.Udf().Register<string, bool>( "SpamRegEx", log => Regex.IsMatch(log, "\b(?=spam)\b")); ipDf.CreateOrReplaceTempView("SpamLogs"); // Apply UDF to get valid, start with 10, spam entries DataFrame spamDF = spark.Sql( "SELECT spamlogs.value FROM SpamLogs WHERE SpamRegEx(spamlogs.value)");
最后,让我们计算最后清理的数据集中GET请求的数量。.NET for Spark的神奇之处在于,我们可以将其与其他流行的.NET功能结合起来编写我们的应用程序。我们将使用LINQ分析Spark应用程序中的数据:
int numGetRequests = spamDF .Collect() .Where(r => ContainsGet(r.GetAs<string>("value"))) .Count();
在上面的代码中,ContainsGet()使用regex匹配检查GET请求:
// Use regex matching to group data // Each group matches a column in our log schema // i.e. first group = first column = IP public static bool ContainsGet(string logLine) { Match match = Regex.Match(logLine, s_apacheRx); // Determine if valid log entry is a GET request if (match.Success) { Console.WriteLine("Full log entry: '{0}'", match.Groups[0].Value); // 5th column/group in schema is "method" if (match.Groups[5].Value == "GET") { return true; } } return false; }
作为最后一步,我们调用Spark.Stop()关闭底层的Spark会话和Spark上下文。
3 运行程序
需要使用spark-submit命令,该命令将提交您的应用程序以便在Apache Spark上运行。
spark-submit命令包括:
-
-
- –class,用于调用DotnetRunner
- –master, 用于定义是本地还是云端的Spark提交
- Path,Microsoft.Spark jar的路径
- 应用程序的其他参数或依赖项,例如输入文件或包含UDF定义的dll的路径。
-
运行应用程序的Windows命令示例如下:
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar dotnet /path/to/netcoreapp<version>/LoggingApp.dll
4 .NET for Apache Spark总结
这里只是一个简单的介绍,.NET for Apache Spark是一个免费、开源、跨平台的大数据分析框架,更多的功能需要读者进一步深入学习。
名词解释
Ad-hoc Query
Ad-hoc Query是一种“on-the-fly”的特殊查询。也就是说,在每一个查询操作被执行之前,查询的目标对象是不明确的。
比如下面这样一条语句:
var mySqlQuery = "SELECT * FROM table WHERE id = " + std_name;
每次执行这一条查询的时候返回的结果都可能会不一样,这取决于std_name的值。