需要实现的功能:
写访问spark的接口,也就是从web上输入网址就能把我们需要的信息通过提交一个job然后返回给我们json数据。
成果展示:
通过url请求,然后的到一个wordcount的json结果(借助的是谷歌浏览器postman插件显示的,直接在浏览器上输入网址是一样的效果)
使用的关键技术:
java语言编程,springmvc框架,tomcat容器,spark框架,scala相关依赖
成体架构:
我使用的是maven构建的一个web工程,pom文件如下:
<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-compiler -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.11</version>
</dependency>
<!-- spring框架的相关jar包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-webmvc -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!--添加持久层框架(mybatise)-->
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.1</version>
</dependency>
<!--mybatise和spring整合包-->
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis-spring -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.0</version>
</dependency>
<!-- -->
<dependency>
<groupId>commons-DBCP</groupId>
<artifactId>commons-DBCP</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.9</version>
</dependency>
<!--添加连接池的jar包-->
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.18</version>
</dependency>
<!--添加数据库驱动-->
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<!-- 日志处理 -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!--json相关的依赖,不要使用jackson的依赖-->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
web.xml的配置(这里只配置了springmvc容器)
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
<display-name>Archetype Created Web Application</display-name>
<!-- springmvc的前端控制器 -->
<servlet>
<servlet-name>manager</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<!-- contextConfigLocation不是必须的, 如果不配置contextConfigLocation, springmvc的配置文件默认在:WEB-INF/servlet的name+"-servlet.xml" -->
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:springmvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>manager</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 解决post乱码 -->
<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>utf-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- 日志配置 -->
<context-param>
<param-name>log4jConfigLocation</param-name>
<param-value>classpath:log4j.properties</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
</listener>
</web-app>
然后就是springMVC的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 配置包扫描器 -->
<context:component-scan base-package="com.zzrenfeng.zhsx.controller" />
<!-- 配置注解驱动 -->
<mvc:annotation-driven />
<context:component-scan base-package="com.zzrenfeng.zhsx.service"></context:component-scan>
<context:component-scan base-package="com.zzrenfeng.zhsx.spark.service"></context:component-scan>
<context:component-scan base-package="com.zzrenfeng.zhsx.spark.conf"></context:component-scan>
</beans>
配置文件就就没有了,如果有需要可以再去集成其他的,下面进入编码的介绍
对象和json相互转换的工具类:
(为什么使用手动的去转换,而没有使用jackson的相关依赖进行自动转换,是我在使用的时候发现使用jackson会对咱们的spark作业有影响,spark作业会异常终止掉)
package com.zzrenfeng.zhsx.util;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.JsonConfig;
/**
* Json与javaBean之间的转换工具类
*
* @author
* @version
*
* {@code 现使用json-lib组件实现
* 需要
* json-lib-2.4-jdk15.jar
* ezmorph-1.0.6.jar
* commons-collections-3.1.jar
* commons-lang-2.0.jar
* 支持
* }
*/
public class JsonUtil {
/**
* 从一个JSON 对象字符格式中得到一个java对象
*
* @param jsonString
* @param beanCalss
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T jsonToBean(String jsonString, Class<T> beanCalss) {
JSONObject jsonObject = JSONObject.fromObject(jsonString);
T bean = (T) JSONObject.toBean(jsonObject, beanCalss);
return bean;
}
/**
* 将java对象转换成json字符串
*
* @param bean
* @return
*/
public static String beanToJson(Object bean) {
JSONObject json = JSONObject.fromObject(bean);
return json.toString();
}
/**
* 将java对象转换成json字符串
*
* @param bean
* @return
*/
public static String beanToJson(Object bean, String[] _nory_changes, boolean nory) {
JSONObject json = null;
if(nory){//转换_nory_changes里的属性
Field[] fields = bean.getClass().getDeclaredFields();
String str = "";
for(Field field : fields){
// System.out.println(field.getName());
str+=(":"+field.getName());
}
fields = bean.getClass().getSuperclass().getDeclaredFields();
for(Field field : fields){
// System.out.println(field.getName());
str+=(":"+field.getName());
}
str+=":";
for(String s : _nory_changes){
str = str.replace(":"+s+":", ":");
}
json = JSONObject.fromObject(bean,configJson(str.split(":")));
}else{//转换除了_nory_changes里的属性
json = JSONObject.fromObject(bean,configJson(_nory_changes));
}
return json.toString();
}
private static JsonConfig configJson(String[] excludes) {
JsonConfig jsonConfig = new JsonConfig();
jsonConfig.setExcludes(excludes);
//
jsonConfig.setIgnoreDefaultExcludes(false);
//
// jsonConfig.setCycleDetectionStrategy(CycleDetectionStrategy.LENIENT);
// jsonConfig.registerJsonValueProcessor(Date.class,
//
// new DateJsonValueProcessor(datePattern));
return jsonConfig;
}
/**
* 将java对象List集合转换成json字符串
* @param beans
* @return
*/
@SuppressWarnings("unchecked")
public static String beanListToJson(List beans) {
StringBuffer rest = new StringBuffer();
rest.append("[");
int size = beans.size();
for (int i = 0; i < size; i++) {
rest.append(beanToJson(beans.get(i))+((i<size-1)?",":""));
}
rest.append("]");
return rest.toString();
}
/**
*
* @param beans
* @param _no_changes
* @return
*/
@SuppressWarnings("unchecked")
public static String beanListToJson(List beans, String[] _nory_changes, boolean nory) {
StringBuffer rest = new StringBuffer();
rest.append("[");
int size = beans.size();
for (int i = 0; i < size; i++) {
try{
rest.append(beanToJson(beans.get(i),_nory_changes,nory));
if(i<size-1){
rest.append(",");
}
}catch(Exception e){
e.printStackTrace();
}
}
rest.append("]");
return rest.toString();
}
/**
* 从json HASH表达式中获取一个map,改map支持嵌套功能
*
* @param jsonString
* @return
*/
@SuppressWarnings({ "unchecked" })
public static Map jsonToMap(String jsonString) {
JSONObject jsonObject = JSONObject.fromObject(jsonString);
Iterator keyIter = jsonObject.keys();
String key;
Object value;
Map valueMap = new HashMap();
while (keyIter.hasNext()) {
key = (String) keyIter.next();
value = jsonObject.get(key).toString();
valueMap.put(key, value);
}
return valueMap;
}
/**
* map集合转换成json格式数据
* @param map
* @return
*/
public static String mapToJson(Map<String, ?> map, String[] _nory_changes, boolean nory){
String s_json = "{";
Set<String> key = map.keySet();
for (Iterator<?> it = key.iterator(); it.hasNext();) {
String s = (String) it.next();
if(map.get(s) == null){
}else if(map.get(s) instanceof List<?>){
s_json+=(s+":"+JsonUtil.beanListToJson((List<?>)map.get(s), _nory_changes, nory));
}else{
JSONObject json = JSONObject.fromObject(map);
s_json += (s+":"+json.toString());;
}
if(it.hasNext()){
s_json+=",";
}
}
s_json+="}";
return s_json;
}
/**
* 从json数组中得到相应java数组
*
* @param jsonString
* @return
*/
public static Object[] jsonToObjectArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
return jsonArray.toArray();
}
public static String listToJson(List<?> list) {
JSONArray jsonArray = JSONArray.fromObject(list);
return jsonArray.toString();
}
/**
* 从json对象集合表达式中得到一个java对象列表
*
* @param jsonString
* @param beanClass
* @return
*/
@SuppressWarnings("unchecked")
public static <T> List<T> jsonToBeanList(String jsonString, Class<T> beanClass) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
JSONObject jsonObject;
T bean;
int size = jsonArray.size();
List<T> list = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
jsonObject = jsonArray.getJSONObject(i);
bean = (T) JSONObject.toBean(jsonObject, beanClass);
list.add(bean);
}
return list;
}
/**
* 从json数组中解析出java字符串数组
*
* @param jsonString
* @return
*/
public static String[] jsonToStringArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
String[] stringArray = new String[jsonArray.size()];
int size = jsonArray.size();
for (int i = 0; i < size; i++) {
stringArray[i] = jsonArray.getString(i);
}
return stringArray;
}
/**
* 从json数组中解析出javaLong型对象数组
*
* @param jsonString
* @return
*/
public static Long[] jsonToLongArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
int size = jsonArray.size();
Long[] longArray = new Long[size];
for (int i = 0; i < size; i++) {
longArray[i] = jsonArray.getLong(i);
}
return longArray;
}
/**
* 从json数组中解析出java Integer型对象数组
*
* @param jsonString
* @return
*/
public static Integer[] jsonToIntegerArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
int size = jsonArray.size();
Integer[] integerArray = new Integer[size];
for (int i = 0; i < size; i++) {
integerArray[i] = jsonArray.getInt(i);
}
return integerArray;
}
/**
* 从json数组中解析出java Double型对象数组
*
* @param jsonString
* @return
*/
public static Double[] jsonToDoubleArray(String jsonString) {
JSONArray jsonArray = JSONArray.fromObject(jsonString);
int size = jsonArray.size();
Double[] doubleArray = new Double[size];
for (int i = 0; i < size; i++) {
doubleArray[i] = jsonArray.getDouble(i);
}
return doubleArray;
}
}
spark的工具类:(主要负责sparkcontext的初始化工作)
package com.zzrenfeng.zhsx.spark.conf;
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.stereotype.Component;
@Component
public class ApplicationConfiguration implements Serializable{
private static final long serialVersionUID = 1L;
public SparkConf sparkconf(){
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("wc");
return conf;
}
public JavaSparkContext javaSparkContext(){
return new JavaSparkContext(sparkconf());
}
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
public String filePath(){
return "E:\测试文件\nlog.txt";
}
}
wordcount model类(对wordcount进行封装)
package com.zzrenfeng.zhsx.spark.domain;
import scala.Serializable;
public class WordCount implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String word;
private Integer count;
public WordCount(){}
public WordCount(String v1, int l) {
word = v1;
count = l;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordCount [word=" + word + ", count=" + count + "]";
}
}
spark service类,主要是负责spark word count的job任务逻辑
package com.zzrenfeng.zhsx.spark.service;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import com.zzrenfeng.zhsx.spark.conf.ApplicationConfiguration;
import com.zzrenfeng.zhsx.spark.domain.WordCount;
@Component
public class SparkServiceTest implements java.io.Serializable{
@Autowired
ApplicationConfiguration applicationConfiguration;
public List<WordCount> doWordCount(){
JavaSparkContext javaSparkContext = applicationConfiguration.javaSparkContext();
System.out.println(javaSparkContext);
JavaRDD<String> file = javaSparkContext.textFile(applicationConfiguration.filePath());
JavaRDD<String> worlds = file.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception {
// TODO Auto-generated method stub
List<String> list = Arrays.asList(t.split(" "));
return list;
}
});
JavaRDD<WordCount> wordcount = worlds.map(new Function<String, WordCount>() {
@Override
public WordCount call(String v1) throws Exception {
return new WordCount(v1,1);
}
});
JavaPairRDD<String, Integer> pairwordCount = wordcount.mapToPair(new PairFunction<WordCount, String, Integer>() {
@Override
public Tuple2<String, Integer> call(WordCount t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<>(t.getWord() , new Integer(t.getCount()));
}
});
JavaPairRDD<String, Integer> worldCounts = pairwordCount.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1+v2;
}
});
JavaRDD result = worldCounts.map(new Function<Tuple2<String,Integer>, WordCount>() {
@Override
public WordCount call(Tuple2<String, Integer> v1) throws Exception {
// TODO Auto-generated method stub
return new WordCount(v1._1,v1._2);
}
});
List<WordCount> list = result.collect();
javaSparkContext.close();
System.out.println(list.toString());
return list;
}
}
controller层,主要负责请求的拦截
package com.zzrenfeng.zhsx.controller;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.zzrenfeng.zhsx.spark.domain.WordCount;
import com.zzrenfeng.zhsx.spark.service.SparkServiceTest;
import com.zzrenfeng.zhsx.util.JsonUtil;
@Controller
@RequestMapping("hello")
public class ControllerTest {
@Autowired
private SparkServiceTest sparkServiceTest;
@RequestMapping("wc")
@ResponseBody
public String wordCount(){
List<WordCount> list = sparkServiceTest.doWordCount();
return JsonUtil.listToJson(list);
}
}
进行启动,然后在浏览器上输入上面的拦截的url就可以看到开始出现的结果了。
因为这是个web接口,所以可以从各个端去调用,甚至可以用其他语言去调用。
现在可以愉快的去撸spark代码了,也许有人会问spark不应该用scala开发更好吗?
个人认为如果是纯粹的数据处理可以使用scala,编写起来太爽了,但是跟其他的集成的时候最好还是用java,毕竟有问题了还可以跟java大牛去讨论讨论。
欢迎有兴趣的一起来探讨