zoukankan      html  css  js  c++  java
  • 基于TCP/IP协议的socket通讯server

    思路:

    socket必须要随项目启动时启动,所以需用Spring自带的监听器,需要保持长连接,要用死循环,所以必须另外起线程,不能阻碍主线程运行

    1.在项目的web.xml中配置listener

    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
      </listener>
    <listener>
        <listener-class>com.ra.car.utils.MyListener</listener-class>
      </listener>
    

    2.因为是一个独立的线程,所以需要调用的注入类不能通过@resource或@aotowire注入,需要应用上下文获取

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    	xmlns:mvc="http://www.springframework.org/schema/mvc"
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:aop="http://www.springframework.org/schema/aop" 
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans 
    		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    		http://www.springframework.org/schema/mvc 
    		http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
    		http://www.springframework.org/schema/context 
    		http://www.springframework.org/schema/context/spring-context-4.0.xsd 
    		http://www.springframework.org/schema/aop 
    		http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
    		http://www.springframework.org/schema/tx 
    		http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 
    		http://www.springframework.org/schema/task 
    		http://www.springframework.org/schema/task/spring-task-4.0.xsd">
    		
    	<!-- 扫描包加载Service实现类 -->
    	<context:component-scan base-package="com.ra.*.service.impl"></context:component-scan>
    	 <bean id="DataCallBackService" class="com.ra.truck.service.impl.DataCallBackServiceImpl"/>
    	 <bean id="RdTrackInfoService" class="com.ra.truck.service.impl.RdTrackInfoServiceImpl"/>
    	 <bean id="OutInterfaceService" class="com.ra.truck.service.impl.OutInterfaceImpl"/>
    	 <bean id="RdPhotoInfoService" class="com.ra.truck.service.impl.RdPhotoInfoServiceImpl"/>
    	<bean id="MessagePackegerService" class="com.ra.truck.service.impl.MessagePackegerServiceImpl"/>
    	 <!--<bean id="redis" class="com.ra.redis.service.impl.JedisClientCluster"/>-->
    </beans>
    

      

    3.创建listener监听器类

    package com.ra.car.utils;
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.ra.car.rabbitMQ.PBWRabbitMQCustomer;
    import com.ra.car.rabbitMQ.RabbitMQCustomer;
    
    /**
     * listener监听器类
     * 
     */
    public class MyListener implements ServletContextListener {
    
        protected static final Logger logge = LoggerFactory
                .getLogger(MyListener.class);
    
        @Override
        public void contextInitialized(ServletContextEvent arg0) {
            //必须单独启线程去跑listener
            Mythread myThread = new Mythread();
            //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    //        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    //        cachedThreadPool.execute(myThread);
            Thread thread = new Thread(myThread);  
            thread.start();
            //启动MQTT
    //        MQTTSubMsg client = new MQTTSubMsg();
    //        client.start();
            RabbitMQCustomer customer=new RabbitMQCustomer();
            Thread threadCustomer = new Thread(customer);
            threadCustomer.start();
            
            PBWRabbitMQCustomer pbwcustomer=new PBWRabbitMQCustomer();
            Thread pbwT = new Thread(pbwcustomer);
            pbwT.start();
        }
    
        @Override
        public void contextDestroyed(ServletContextEvent arg0) {
            logge.info("进入ListenerUtil的contextDestroyed方法.........");
        }
    
    }
    package com.ra.car.utils;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 多线程类
     *
     */
    public class Mythread implements Runnable{
    
        protected static final Logger logge = LoggerFactory
                .getLogger(Mythread.class);
        
        @Override
        public void run() {
            logge.info("进入ListenerUtil的contextInitialized方法.........");
            try {
                ServerSocket serverSocket = new ServerSocket(8888);
                logge.info("socket通信服务端已启动,等待客户端连接.......");
                logge.info("我是111111111111111");
                while (true) {
                    Socket socket = serverSocket.accept();// 侦听并接受到此套接字的连接,返回一个Socket对象
                    JavaTCPServer socketThread = new JavaTCPServer(socket);
                    socketThread.run();
                    try {
                        //休眠10毫秒,压力测试50000次连接无压力
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                logge.error("通信服务器启动失败!", e);
            }
        }
        public static String stampToDate(String s){
            Long timestamp = Long.parseLong(s)*1000;  
              String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp));
    
            return date;
        }
        
    }
    package com.ra.car.utils;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class JavaTCPServer {
        protected static final Logger logger=LoggerFactory.getLogger(JavaTCPServer.class);
    
        private Socket socket;
        
        public JavaTCPServer(Socket socket) {
            this.socket = socket;
        }
        
        public void run() {
            MyThread2 myThread2=null;
            try {
                myThread2 = new MyThread2(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            cachedThreadPool.execute(myThread2);
        }
    
        
         
    }
    package com.ra.car.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.ra.truck.model.RdDeviceCallBackDataDomain;
    import com.ra.truck.service.DataCallBackService;
    import com.ra.truck.service.RdPhotoInfoService;
    import com.ra.truck.service.RdTrackInfoService;
    import com.ra.truck.service.outInterface.OutInterfaceService;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.context.ContextLoader;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    public class MyThread2 implements Runnable {
    
        protected static final Logger logger = LoggerFactory
                .getLogger(MyThread2.class);
    
        private Socket socket;
        private InputStream inputStream;
        private OutputStream outputStream;
        private PrintWriter printWriter;
        
        private int totalCount;  //总数量
    
        private int adasCount; // 传输的ADAS信号数量
        private int gpsCount; // 传输的GPS信号数量
        private DataCallBackService dataCallBackService;//数据回传private SimpleDateFormat df;
    
        public MyThread2(Socket socket) throws IOException {
            this.socket = socket;
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
            printWriter = new PrintWriter(outputStream);
        
            dataCallBackService=(DataCallBackService)
                     ContextLoader.getCurrentWebApplicationContext().getBean("DataCallBackService");
            df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        }
    
        @Override
        public void run() {
            // 根据输入输出流和客户端连接
    
            // 得到一个输入流,接收客户端传递的信息
            // InputStreamReader inputStreamReader = new InputStreamReader(
            // inputStream);// 提高效率,将自己字节流转为字符流
            // bufferedReader = new BufferedReader(inputStreamReader);// 加入缓冲区
            Date timestart = new Date();
            Date timeend = null;
            long minuine = 0;
            int count = 0;
            while (true) {
                try {
                    if (inputStream.available() > 0 == false) {
                        timeend = new Date();
                        minuine = timeend.getTime() - timestart.getTime();
                        if (minuine != 0 && (minuine / 1000) > 60) {
                            break;
                        }
                        continue;
                    } else {
                        timestart = new Date();
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            logger.error("*****线程休眠出现异常*****", e);
                        }
                        count = inputStream.available();
                        byte[] b = new byte[count];
                        int readCount = 0; // 已经成功读取的字节的个数
                        while (readCount < count) {
                            readCount += inputStream.read(b, readCount, count
                                    - readCount);
                        }
                        logger.info("**********当前服务器正在被连接**********");
                        logger.info("正在连接的客户端IP为:"
                                + socket.getInetAddress().getHostAddress());
                        
                        logger.info("当前时间为:" + df.format(new Date()));
                        String data = new String(b, "utf-8");
                        logger.info("传输过来的info:" + data);
                        String id = jsonStringToObject(data);
                        Map<Object, Object> map = new HashMap<Object, Object>();
                        //心跳发送不带id的json数据
                        if (StringUtils.isNotBlank(id)) {
                            map.put("id", id);
                        }
                        map.put("resultCode", "1");
                        map.put("result", "success");
                        printWriter.print(JSON.toJSONString(map) + "
    ");
                        printWriter.flush();
                    }
                } catch (Exception e) {
                    logger.error("数据传输出现异常", e);
                    try {
                        outputStream = socket.getOutputStream();
                    } catch (IOException e1) {
                        logger.error("获取outputStream出现异常");
                    }
                    // 获取一个输出流,向服务端发送信息
                    // printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流
                    Map<Object, Object> map = new HashMap<Object, Object>();
                    map.put("resultCode", "0");
                    map.put("result", "fail");
                    printWriter.print(JSON.toJSONString(map) + "
    ");
                    printWriter.flush();
                }
            }
            try {
                printWriter.close();
                outputStream.close();
                inputStream.close();
                logger.info("30s没有发送数据,服务端主动关闭连接");
                logger.info("被断开的客户端IP为:"
                        + socket.getInetAddress().getHostAddress());
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                logger.info("被断开的时间为:" + df.format(new Date()));
                socket.close();
            } catch (IOException e) {
                logger.error("关闭socket出现异常", e);
    
            }
    
            /*
             * while ((temp = bufferedReader.readLine()) != null) { info += temp;
             * logger.info(bufferedReader.readLine());
             * logger.info("已接收到客户端连接!!!!!!"); logger.info("服务端接收到客户端信息:" +
             * info + ",当前客户端ip为:" + socket.getInetAddress().getHostAddress());
             * logger.info("服务端接收到客户端信息:" + info + ",当前客户端ip为:" +
             * socket.getInetAddress().getHostAddress()); }
             */
    
            /*
             * logger.info("*****测试Redis*****"); JedisClient
             * jedisClient=(JedisClient)
             * ContextLoader.getCurrentWebApplicationContext().getBean("redis");
             * jedisClient.set("testLanHao", "123456789"); String
             * str=jedisClient.get("testLanHao");
             * logger.info("从Redis中取得数据为:"+str);
             * logger.info("*****测试Redis*****");
             */
    
            // ApplicationContext applicationContext=new
            // ClassPathXmlApplicationContext("classpath*:applicationContext-*.xml");
            // RiskManageService
            // riskManageService=applicationContext.getBean(RiskManageService.class);
            // socket单独线程,需要重新加载上下文,扫描的类在applicationContext-service.xml配置
            /*
             * RiskManageService riskManageService=(RiskManageService)
             * ContextLoader.getCurrentWebApplicationContext().getBean("risk");
             * RdRiskEventInfo rdRiskEventInfo=new RdRiskEventInfo();
             * rdRiskEventInfo.setId("10"); try { List<RdPhotoInfo>
             * list=riskManageService.findPhotoInfoByEventId(rdRiskEventInfo);
             * logger.info(list); } catch (ServiceException e) {
             * e.printStackTrace(); }
             */
            // outputStream = socket.getOutputStream();// 获取一个输出流,向服务端发送信息
            // printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流
    
        }
    
        private String jsonStringToObject(String data) {
            //数据解析方法return xx;
        }
        public static Date stampToDate(String s){
            
            Long timestamp = Long.parseLong(s)*1000;  
              Date date = new Date(timestamp);
    
            return date;
        }
    
  • 相关阅读:
    Matlab 绘制三维立体图(以地质异常体为例)
    Azure DevOps的variable group实现array和hashtable参数的传递
    Azure DevOps 利用rest api设置variable group
    Azure AADSTS7000215 其中一种问题的解决
    Power BI 实现实时更新Streaming Dataset
    AAD Service Principal获取azure user list (Microsoft Graph API)
    Matlab 沿三维任意方向切割CT图的仿真计算
    Azure Powershell script检测登陆并部署ARM Template
    Azure KeyVault设置策略和自动化添加secrets键值对
    Azure登陆的两种常见方式(user 和 service principal登陆)
  • 原文地址:https://www.cnblogs.com/lazyInsects/p/8000125.html
Copyright © 2011-2022 走看看