package zhunneng; import java.io.InputStream; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.alibaba.fastjson.JSON; import com.bawei.utils.StreamUtil; import com.liujin.cms.domain.Plan; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:producer.xml") public class TestImportData { @Autowired KafkaTemplate<String, String> kafkaTemplate; @Test public void testImport() { InputStream is = this.getClass().getResourceAsStream("/data.txt"); //使用工具类进行解析 List<String> read = StreamUtil.readLine(is); for (String line : read) { //System.out.println(line); //每个|都要转义 String[] split = line.split("\|\|"); String name=split[0]; double amount=Double.parseDouble(split[1]); String manager=split[3]; String content=split[2]; int dept_id=0; if (split[4].contains("厂")) { dept_id=1; }else if (split[4].contains("准能选煤厂")) { dept_id=2; }else if (split[4].contains("洗选车间")) { dept_id=3; }else if (split[4].contains("生产服务中心")) { dept_id=4; }else if (split[4].contains("矸电公司")) { dept_id=5; }else if(split[4].contains("大准铁路公司")){ dept_id=6; } Plan plan = new Plan(); plan.setName(name); plan.setManager(manager); plan.setContent(content); plan.setAmount(amount); String jsonString = JSON.toJSONString(plan); kafkaTemplate.send("zhunneng", jsonString); } } }
package com.liujin; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.liujin.cms.domain.CompanyAnnualCheck; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:spring-beans.xml") public class TestportData { @Autowired RedisTemplate redisTemplate; @Autowired KafkaTemplate<String, String> kafkaTemplate; @SuppressWarnings({ "resource", "unchecked" }) @Test public void testImport() throws IOException, ParseException { //1.准备源文件 File file = new File("G:\QQ文件\专高\专二\理论\data.txt"); //2.读取文本文件的内容 BufferedReader br = new BufferedReader(new FileReader(file)); String line=null; br.readLine();//排除第一行 //2.1.循环读取 ArrayList<CompanyAnnualCheck> list = new ArrayList<CompanyAnnualCheck>(); while ((line=br.readLine())!=null) { //声明一个实体类 CompanyAnnualCheck check = new CompanyAnnualCheck(); String[] split = line.split(" "); int id=Integer.parseInt(split[0]); String keywords=split[1]; String description=split[2]; String companyName=split[3]; String mainBusinessProducts=split[4]; String address=split[5]; String registeredCapital=split[6]; Date annualCheckDate=new SimpleDateFormat("yyyy/MM/dd").parse(split[7]); String annualCheckStatus=split[8]; check.setId(id); check.setKeywords(keywords); check.setDescription(description); check.setCompanyName(companyName); check.setMainBusinessProducts(mainBusinessProducts); check.setAddress(address); check.setRegisteredCapital(registeredCapital); check.setAnnualCheckDate(annualCheckDate); check.setAnnualCheckStatus(annualCheckStatus); System.out.println(check); list.add(check); } //3.读取出来的内容一行一行的解析 //解析,封装完数据之后,就可以把这些对象保存到redis中,并且向卡夫卡发送消息 redisTemplate.opsForList().leftPushAll("check_list", list.toArray()); System.err.println("保存到redis成功!!!"); kafkaTemplate.send("check", "check_list"); System.err.println("已经把Redis的key发送到卡夫卡"); } }