zoukankan      html  css  js  c++  java
  • 基于TCP/IP协议的socket通讯server

    思路:

    socket必须要随项目启动时启动,所以需用Spring自带的监听器,需要保持长连接,要用死循环,所以必须另外起线程,不能阻碍主线程运行

    1.在项目的web.xml中配置listener

    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
      </listener>
    <listener>
        <listener-class>com.ra.car.utils.MyListener</listener-class>
      </listener>
    

    2.因为是一个独立的线程,所以需要调用的注入类不能通过@resource或@aotowire注入,需要应用上下文获取

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    	xmlns:mvc="http://www.springframework.org/schema/mvc"
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:aop="http://www.springframework.org/schema/aop" 
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans 
    		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    		http://www.springframework.org/schema/mvc 
    		http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
    		http://www.springframework.org/schema/context 
    		http://www.springframework.org/schema/context/spring-context-4.0.xsd 
    		http://www.springframework.org/schema/aop 
    		http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
    		http://www.springframework.org/schema/tx 
    		http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 
    		http://www.springframework.org/schema/task 
    		http://www.springframework.org/schema/task/spring-task-4.0.xsd">
    		
    	<!-- 扫描包加载Service实现类 -->
    	<context:component-scan base-package="com.ra.*.service.impl"></context:component-scan>
    	 <bean id="DataCallBackService" class="com.ra.truck.service.impl.DataCallBackServiceImpl"/>
    	 <bean id="RdTrackInfoService" class="com.ra.truck.service.impl.RdTrackInfoServiceImpl"/>
    	 <bean id="OutInterfaceService" class="com.ra.truck.service.impl.OutInterfaceImpl"/>
    	 <bean id="RdPhotoInfoService" class="com.ra.truck.service.impl.RdPhotoInfoServiceImpl"/>
    	<bean id="MessagePackegerService" class="com.ra.truck.service.impl.MessagePackegerServiceImpl"/>
    	 <!--<bean id="redis" class="com.ra.redis.service.impl.JedisClientCluster"/>-->
    </beans>
    

      

    3.创建listener监听器类

    package com.ra.car.utils;
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.ra.car.rabbitMQ.PBWRabbitMQCustomer;
    import com.ra.car.rabbitMQ.RabbitMQCustomer;
    
    /**
     * listener监听器类
     * 
     */
    public class MyListener implements ServletContextListener {
    
        protected static final Logger logge = LoggerFactory
                .getLogger(MyListener.class);
    
        @Override
        public void contextInitialized(ServletContextEvent arg0) {
            //必须单独启线程去跑listener
            Mythread myThread = new Mythread();
            //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    //        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    //        cachedThreadPool.execute(myThread);
            Thread thread = new Thread(myThread);  
            thread.start();
            //启动MQTT
    //        MQTTSubMsg client = new MQTTSubMsg();
    //        client.start();
            RabbitMQCustomer customer=new RabbitMQCustomer();
            Thread threadCustomer = new Thread(customer);
            threadCustomer.start();
            
            PBWRabbitMQCustomer pbwcustomer=new PBWRabbitMQCustomer();
            Thread pbwT = new Thread(pbwcustomer);
            pbwT.start();
        }
    
        @Override
        public void contextDestroyed(ServletContextEvent arg0) {
            logge.info("进入ListenerUtil的contextDestroyed方法.........");
        }
    
    }
    package com.ra.car.utils;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 多线程类
     *
     */
    public class Mythread implements Runnable{
    
        protected static final Logger logge = LoggerFactory
                .getLogger(Mythread.class);
        
        @Override
        public void run() {
            logge.info("进入ListenerUtil的contextInitialized方法.........");
            try {
                ServerSocket serverSocket = new ServerSocket(8888);
                logge.info("socket通信服务端已启动,等待客户端连接.......");
                logge.info("我是111111111111111");
                while (true) {
                    Socket socket = serverSocket.accept();// 侦听并接受到此套接字的连接,返回一个Socket对象
                    JavaTCPServer socketThread = new JavaTCPServer(socket);
                    socketThread.run();
                    try {
                        //休眠10毫秒,压力测试50000次连接无压力
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                logge.error("通信服务器启动失败!", e);
            }
        }
        public static String stampToDate(String s){
            Long timestamp = Long.parseLong(s)*1000;  
              String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp));
    
            return date;
        }
        
    }
    package com.ra.car.utils;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class JavaTCPServer {
        protected static final Logger logger=LoggerFactory.getLogger(JavaTCPServer.class);
    
        private Socket socket;
        
        public JavaTCPServer(Socket socket) {
            this.socket = socket;
        }
        
        public void run() {
            MyThread2 myThread2=null;
            try {
                myThread2 = new MyThread2(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            cachedThreadPool.execute(myThread2);
        }
    
        
         
    }
    package com.ra.car.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.ra.truck.model.RdDeviceCallBackDataDomain;
    import com.ra.truck.service.DataCallBackService;
    import com.ra.truck.service.RdPhotoInfoService;
    import com.ra.truck.service.RdTrackInfoService;
    import com.ra.truck.service.outInterface.OutInterfaceService;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.context.ContextLoader;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    public class MyThread2 implements Runnable {
    
        protected static final Logger logger = LoggerFactory
                .getLogger(MyThread2.class);
    
        private Socket socket;
        private InputStream inputStream;
        private OutputStream outputStream;
        private PrintWriter printWriter;
        
        private int totalCount;  //总数量
    
        private int adasCount; // 传输的ADAS信号数量
        private int gpsCount; // 传输的GPS信号数量
        private DataCallBackService dataCallBackService;//数据回传private SimpleDateFormat df;
    
        public MyThread2(Socket socket) throws IOException {
            this.socket = socket;
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
            printWriter = new PrintWriter(outputStream);
        
            dataCallBackService=(DataCallBackService)
                     ContextLoader.getCurrentWebApplicationContext().getBean("DataCallBackService");
            df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        }
    
        @Override
        public void run() {
            // 根据输入输出流和客户端连接
    
            // 得到一个输入流,接收客户端传递的信息
            // InputStreamReader inputStreamReader = new InputStreamReader(
            // inputStream);// 提高效率,将自己字节流转为字符流
            // bufferedReader = new BufferedReader(inputStreamReader);// 加入缓冲区
            Date timestart = new Date();
            Date timeend = null;
            long minuine = 0;
            int count = 0;
            while (true) {
                try {
                    if (inputStream.available() > 0 == false) {
                        timeend = new Date();
                        minuine = timeend.getTime() - timestart.getTime();
                        if (minuine != 0 && (minuine / 1000) > 60) {
                            break;
                        }
                        continue;
                    } else {
                        timestart = new Date();
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            logger.error("*****线程休眠出现异常*****", e);
                        }
                        count = inputStream.available();
                        byte[] b = new byte[count];
                        int readCount = 0; // 已经成功读取的字节的个数
                        while (readCount < count) {
                            readCount += inputStream.read(b, readCount, count
                                    - readCount);
                        }
                        logger.info("**********当前服务器正在被连接**********");
                        logger.info("正在连接的客户端IP为:"
                                + socket.getInetAddress().getHostAddress());
                        
                        logger.info("当前时间为:" + df.format(new Date()));
                        String data = new String(b, "utf-8");
                        logger.info("传输过来的info:" + data);
                        String id = jsonStringToObject(data);
                        Map<Object, Object> map = new HashMap<Object, Object>();
                        //心跳发送不带id的json数据
                        if (StringUtils.isNotBlank(id)) {
                            map.put("id", id);
                        }
                        map.put("resultCode", "1");
                        map.put("result", "success");
                        printWriter.print(JSON.toJSONString(map) + "
    ");
                        printWriter.flush();
                    }
                } catch (Exception e) {
                    logger.error("数据传输出现异常", e);
                    try {
                        outputStream = socket.getOutputStream();
                    } catch (IOException e1) {
                        logger.error("获取outputStream出现异常");
                    }
                    // 获取一个输出流,向服务端发送信息
                    // printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流
                    Map<Object, Object> map = new HashMap<Object, Object>();
                    map.put("resultCode", "0");
                    map.put("result", "fail");
                    printWriter.print(JSON.toJSONString(map) + "
    ");
                    printWriter.flush();
                }
            }
            try {
                printWriter.close();
                outputStream.close();
                inputStream.close();
                logger.info("30s没有发送数据,服务端主动关闭连接");
                logger.info("被断开的客户端IP为:"
                        + socket.getInetAddress().getHostAddress());
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                logger.info("被断开的时间为:" + df.format(new Date()));
                socket.close();
            } catch (IOException e) {
                logger.error("关闭socket出现异常", e);
    
            }
    
            /*
             * while ((temp = bufferedReader.readLine()) != null) { info += temp;
             * logger.info(bufferedReader.readLine());
             * logger.info("已接收到客户端连接!!!!!!"); logger.info("服务端接收到客户端信息:" +
             * info + ",当前客户端ip为:" + socket.getInetAddress().getHostAddress());
             * logger.info("服务端接收到客户端信息:" + info + ",当前客户端ip为:" +
             * socket.getInetAddress().getHostAddress()); }
             */
    
            /*
             * logger.info("*****测试Redis*****"); JedisClient
             * jedisClient=(JedisClient)
             * ContextLoader.getCurrentWebApplicationContext().getBean("redis");
             * jedisClient.set("testLanHao", "123456789"); String
             * str=jedisClient.get("testLanHao");
             * logger.info("从Redis中取得数据为:"+str);
             * logger.info("*****测试Redis*****");
             */
    
            // ApplicationContext applicationContext=new
            // ClassPathXmlApplicationContext("classpath*:applicationContext-*.xml");
            // RiskManageService
            // riskManageService=applicationContext.getBean(RiskManageService.class);
            // socket单独线程,需要重新加载上下文,扫描的类在applicationContext-service.xml配置
            /*
             * RiskManageService riskManageService=(RiskManageService)
             * ContextLoader.getCurrentWebApplicationContext().getBean("risk");
             * RdRiskEventInfo rdRiskEventInfo=new RdRiskEventInfo();
             * rdRiskEventInfo.setId("10"); try { List<RdPhotoInfo>
             * list=riskManageService.findPhotoInfoByEventId(rdRiskEventInfo);
             * logger.info(list); } catch (ServiceException e) {
             * e.printStackTrace(); }
             */
            // outputStream = socket.getOutputStream();// 获取一个输出流,向服务端发送信息
            // printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流
    
        }
    
        private String jsonStringToObject(String data) {
            //数据解析方法return xx;
        }
        public static Date stampToDate(String s){
            
            Long timestamp = Long.parseLong(s)*1000;  
              Date date = new Date(timestamp);
    
            return date;
        }
    
  • 相关阅读:
    mock模拟数据的使用方法
    mac下载wepy报错解决方案
    收集:40种js常用技巧
    学习——面试现场整理的笔记
    mac又更新系统了!!!
    H5的优化方案
    双十一到了,把自己学习的运营笔记发一部分
    mongodb操作笔记
    HTTP协议及常见状态码
    跨域解决方案
  • 原文地址:https://www.cnblogs.com/lazyInsects/p/8000125.html
Copyright © 2011-2022 走看看