博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark与Spring集成做web接口
阅读量:4967 次
发布时间:2019-06-12

本文共 16976 字,大约阅读时间需要 56 分钟。

需要实现的功能:

写访问spark的接口,也就是从web上输入网址就能把我们需要的信息通过提交一个job然后返回给我们json数据。

成果展示:

通过url请求,然后的到一个wordcount的json结果(借助的是谷歌浏览器postman插件显示的,直接在浏览器上输入网址是一样的效果) 

使用的关键技术:

java语言编程,springmvc框架,tomcat容器,spark框架,scala相关依赖

成体架构:

我使用的是maven构建的一个web工程,pom文件如下:

junit
junit
4.12
test
org.apache.spark
spark-core_2.11
1.6.3
org.apache.spark
spark-sql_2.11
1.6.3
org.scala-lang
scala-library
2.11.11
org.scala-lang
scala-reflect
2.11.11
org.scala-lang
scala-compiler
2.11.11
org.springframework
spring-context
4.3.4.RELEASE
org.springframework
spring-jdbc
4.3.4.RELEASE
org.springframework
spring-web
4.3.4.RELEASE
org.springframework
spring-webmvc
4.3.4.RELEASE
org.mybatis
mybatis
3.4.1
org.mybatis
mybatis-spring
1.3.0
commons-DBCP
commons-DBCP
1.4
org.aspectj
aspectjweaver
1.8.9
com.alibaba
druid
1.0.18
mysql
mysql-connector-java
5.1.39
org.slf4j
slf4j-api
1.7.21
org.slf4j
slf4j-log4j12
1.7.21
log4j
log4j
1.2.17
net.sf.json-lib
json-lib
2.4
jdk15

web.xml的配置(这里只配置了springmvc容器)

Archetype Created Web Application
manager
org.springframework.web.servlet.DispatcherServlet
contextConfigLocation
classpath:springmvc.xml
1
manager
/
CharacterEncodingFilter
org.springframework.web.filter.CharacterEncodingFilter
encoding
utf-8
CharacterEncodingFilter
/*
log4jConfigLocation
classpath:log4j.properties
org.springframework.web.util.Log4jConfigListener

然后就是springMVC的配置文件

配置文件就就没有了,如果有需要可以再去集成其他的,下面进入编码的介绍

对象和json相互转换的工具类:

(为什么使用手动的去转换,而没有使用jackson的相关依赖进行自动转换,是我在使用的时候发现使用jackson会对咱们的spark作业有影响,spark作业会异常终止掉)

package com.zzrenfeng.zhsx.util;import java.lang.reflect.Field;  import java.util.ArrayList;  import java.util.HashMap;  import java.util.Iterator;  import java.util.List;  import java.util.Map;  import java.util.Set;  import net.sf.json.JSONArray;  import net.sf.json.JSONObject;  import net.sf.json.JsonConfig;  /**  * Json与javaBean之间的转换工具类  *   * @author  * @version *   * {@code   现使用json-lib组件实现  *          需要  *              json-lib-2.4-jdk15.jar  *              ezmorph-1.0.6.jar  *              commons-collections-3.1.jar  *              commons-lang-2.0.jar  *          支持  * }  */  public class JsonUtil {      /**        * 从一个JSON 对象字符格式中得到一个java对象        *         * @param jsonString        * @param beanCalss        * @return        */      @SuppressWarnings("unchecked")      public static 
T jsonToBean(String jsonString, Class
beanCalss) { JSONObject jsonObject = JSONObject.fromObject(jsonString); T bean = (T) JSONObject.toBean(jsonObject, beanCalss); return bean; } /** * 将java对象转换成json字符串 * * @param bean * @return */ public static String beanToJson(Object bean) { JSONObject json = JSONObject.fromObject(bean); return json.toString(); } /** * 将java对象转换成json字符串 * * @param bean * @return */ public static String beanToJson(Object bean, String[] _nory_changes, boolean nory) { JSONObject json = null; if(nory){
//转换_nory_changes里的属性 Field[] fields = bean.getClass().getDeclaredFields(); String str = ""; for(Field field : fields){ // System.out.println(field.getName()); str+=(":"+field.getName()); } fields = bean.getClass().getSuperclass().getDeclaredFields(); for(Field field : fields){ // System.out.println(field.getName()); str+=(":"+field.getName()); } str+=":"; for(String s : _nory_changes){ str = str.replace(":"+s+":", ":"); } json = JSONObject.fromObject(bean,configJson(str.split(":"))); }else{
//转换除了_nory_changes里的属性 json = JSONObject.fromObject(bean,configJson(_nory_changes)); } return json.toString(); } private static JsonConfig configJson(String[] excludes) { JsonConfig jsonConfig = new JsonConfig(); jsonConfig.setExcludes(excludes); // jsonConfig.setIgnoreDefaultExcludes(false); // // jsonConfig.setCycleDetectionStrategy(CycleDetectionStrategy.LENIENT); // jsonConfig.registerJsonValueProcessor(Date.class, // // new DateJsonValueProcessor(datePattern)); return jsonConfig; } /** * 将java对象List集合转换成json字符串 * @param beans * @return */ @SuppressWarnings("unchecked") public static String beanListToJson(List beans) { StringBuffer rest = new StringBuffer(); rest.append("["); int size = beans.size(); for (int i = 0; i < size; i++) { rest.append(beanToJson(beans.get(i))+((i
map, String[] _nory_changes, boolean nory){ String s_json = "{
"; Set
key = map.keySet(); for (Iterator
it = key.iterator(); it.hasNext();) { String s = (String) it.next(); if(map.get(s) == null){ }else if(map.get(s) instanceof List
){ s_json+=(s+":"+JsonUtil.beanListToJson((List
)map.get(s), _nory_changes, nory)); }else{ JSONObject json = JSONObject.fromObject(map); s_json += (s+":"+json.toString());; } if(it.hasNext()){ s_json+=","; } } s_json+="}"; return s_json; } /** * 从json数组中得到相应java数组 * * @param jsonString * @return */ public static Object[] jsonToObjectArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); return jsonArray.toArray(); } public static String listToJson(List
list) { JSONArray jsonArray = JSONArray.fromObject(list); return jsonArray.toString(); } /** * 从json对象集合表达式中得到一个java对象列表 * * @param jsonString * @param beanClass * @return */ @SuppressWarnings("unchecked") public static
List
jsonToBeanList(String jsonString, Class
beanClass) { JSONArray jsonArray = JSONArray.fromObject(jsonString); JSONObject jsonObject; T bean; int size = jsonArray.size(); List
list = new ArrayList
(size); for (int i = 0; i < size; i++) { jsonObject = jsonArray.getJSONObject(i); bean = (T) JSONObject.toBean(jsonObject, beanClass); list.add(bean); } return list; } /** * 从json数组中解析出java字符串数组 * * @param jsonString * @return */ public static String[] jsonToStringArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); String[] stringArray = new String[jsonArray.size()]; int size = jsonArray.size(); for (int i = 0; i < size; i++) { stringArray[i] = jsonArray.getString(i); } return stringArray; } /** * 从json数组中解析出javaLong型对象数组 * * @param jsonString * @return */ public static Long[] jsonToLongArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); int size = jsonArray.size(); Long[] longArray = new Long[size]; for (int i = 0; i < size; i++) { longArray[i] = jsonArray.getLong(i); } return longArray; } /** * 从json数组中解析出java Integer型对象数组 * * @param jsonString * @return */ public static Integer[] jsonToIntegerArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); int size = jsonArray.size(); Integer[] integerArray = new Integer[size]; for (int i = 0; i < size; i++) { integerArray[i] = jsonArray.getInt(i); } return integerArray; } /** * 从json数组中解析出java Double型对象数组 * * @param jsonString * @return */ public static Double[] jsonToDoubleArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); int size = jsonArray.size(); Double[] doubleArray = new Double[size]; for (int i = 0; i < size; i++) { doubleArray[i] = jsonArray.getDouble(i); } return doubleArray; } }
spark的工具类:(主要负责sparkcontext的初始化工作)
package com.zzrenfeng.zhsx.spark.conf;import java.io.Serializable;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;import org.springframework.stereotype.Component;@Componentpublic class ApplicationConfiguration implements Serializable{    private static final long serialVersionUID = 1L;    public SparkConf sparkconf(){        SparkConf conf = new SparkConf()            .setMaster("local[*]")            .setAppName("wc");        return conf;    }    public JavaSparkContext javaSparkContext(){        return new JavaSparkContext(sparkconf());    }    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {        return new PropertySourcesPlaceholderConfigurer();    }    public String filePath(){        return "E:\\测试文件\\nlog.txt";    }   }

wordcount model类(对wordcount进行封装)

package com.zzrenfeng.zhsx.spark.domain;import scala.Serializable;public class WordCount implements Serializable{    /**     *      */    private static final long serialVersionUID = 1L;    private String word;    private Integer count;    public WordCount(){}    public WordCount(String v1, int l) {        word = v1;        count = l;    }    public String getWord() {        return word;    }    public void setWord(String word) {        this.word = word;    }    public int getCount() {        return count;    }    public void setCount(int count) {        this.count = count;    }    @Override    public String toString() {        return "WordCount [word=" + word + ", count=" + count + "]";    }}

spark service类,主要是负责spark word count的job任务逻辑

package com.zzrenfeng.zhsx.spark.service;import java.util.Arrays;import java.util.List;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import scala.Tuple2;import com.zzrenfeng.zhsx.spark.conf.ApplicationConfiguration;import com.zzrenfeng.zhsx.spark.domain.WordCount;@Componentpublic class SparkServiceTest implements java.io.Serializable{    @Autowired    ApplicationConfiguration applicationConfiguration;    public List
doWordCount(){ JavaSparkContext javaSparkContext = applicationConfiguration.javaSparkContext(); System.out.println(javaSparkContext); JavaRDD
file = javaSparkContext.textFile(applicationConfiguration.filePath()); JavaRDD
worlds = file.flatMap(new FlatMapFunction
() { @Override public Iterable
call(String t) throws Exception { // TODO Auto-generated method stub List
list = Arrays.asList(t.split(" ")); return list; } }); JavaRDD
wordcount = worlds.map(new Function
() { @Override public WordCount call(String v1) throws Exception { return new WordCount(v1,1); } }); JavaPairRDD
pairwordCount = wordcount.mapToPair(new PairFunction
() { @Override public Tuple2
call(WordCount t) throws Exception { // TODO Auto-generated method stub return new Tuple2<>(t.getWord() , new Integer(t.getCount())); } }); JavaPairRDD
worldCounts = pairwordCount.reduceByKey(new Function2
() { @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1+v2; } }); JavaRDD result = worldCounts.map(new Function
, WordCount>() { @Override public WordCount call(Tuple2
v1) throws Exception { // TODO Auto-generated method stub return new WordCount(v1._1,v1._2); } }); List
list = result.collect(); javaSparkContext.close(); System.out.println(list.toString()); return list; }}

controller层,主要负责请求的拦截

package com.zzrenfeng.zhsx.controller;import java.util.ArrayList;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.ui.Model;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;import com.zzrenfeng.zhsx.spark.domain.WordCount;import com.zzrenfeng.zhsx.spark.service.SparkServiceTest;import com.zzrenfeng.zhsx.util.JsonUtil;@Controller@RequestMapping("hello")public class ControllerTest {    @Autowired    private SparkServiceTest sparkServiceTest;    @RequestMapping("wc")    @ResponseBody    public String wordCount(){        List
list = sparkServiceTest.doWordCount(); return JsonUtil.listToJson(list); }}

进行启动,然后在浏览器上输入上面的拦截的url就可以看到开始出现的结果了。 

因为这是个web接口,所以可以从各个端去调用,甚至可以用其他语言去调用。 
现在可以愉快的去撸spark代码了,也许有人会问spark不应该用scala开发更好吗? 
个人认为如果是纯粹的数据处理可以使用scala,编写起来太爽了,但是跟其他的集成的时候最好还是用java,毕竟有问题了还可以跟java大牛去讨论讨论。 
欢迎有兴趣的一起来探讨

转载于:https://www.cnblogs.com/itboys/p/9326330.html

你可能感兴趣的文章
java中的合同打印_比较方法违反了Java 7中的一般合同
查看>>
php 位运算与权限,怎么在PHP中使用位运算对网站的权限进行管理
查看>>
php include效率,php include类文件超时
查看>>
matlab sin函数 fft,matlab的fft函数的使用教程
查看>>
wcdma下行如何解扩解扰 matlab,WCDMA技术基础.ppt
查看>>
MySQL date_format() 函数
查看>>
mysql 时间处理
查看>>
mysql adddate()函数
查看>>
mysql addtime() 函数
查看>>
mysql 根据日期时间查询数据
查看>>
mysql sin() 函数
查看>>
mysql upper() 函数
查看>>
mysql 子查询
查看>>
mysql 自联结
查看>>
mysql union 组合查询
查看>>
socket tcp
查看>>
spiral-matrix-ii &i 生成顺时针序列
查看>>
python set集合方法总结
查看>>
python考点
查看>>
DataMining--Python基础入门
查看>>