Spring boot project: RedisTemplate implements lightweight message queuing

Background: there is a demand for the company's project. The front-end uploads excel files, the back-end reads data, processes data, and returns error data. The simplest way is to process them synchronously. After the client uploads the files, it is always blocked and waits for response, but the user experience is undoubtedly poor. It may take ten minutes to process the data, and no one is willing to be stupid. Because the project does not use ActiveMQ and other Message Queuing Middleware, And redis's lpush and rpop are very suitable to be implemented as a lightweight message queue, so use it to complete this function development

1, Knowledge points involved in this paper

  1. Reading and writing excel files -- alieasyexc SDK

  2. File upload and download -- Tencent cloud object storage

  3. Remote service call -- restTemplate

  4. Producers and consumers -- redisTemplate leftPush and rightPop operations

  5. Asynchronously processing data -- Executors thread pool

  6. Read network file stream -- HttpClient

  7. User defined annotation implements user identity authentication -- JWT token authentication, interceptor intercepts request entry marked with @ LoginRequired annotation

Of course, Java implementation involves a lot of knowledge points. Each knowledge point can be used as a special topic for learning and analysis. In this paper, the complete implementation will be presented and later split to share learning with small partners

2, Project directory structure


Project structure

Note: the database DAO layer is put into another module, which is not the focus of this article

3, Major maven dependencies

easyexcel


JWT


redis


Tencent cos


Four, process

  1. User upload file

  2. Store files to Tencent cos

  3. Save the uploaded file id and upload record to the database

  4. Redis produces an import message that saves the file id to redis

  5. End of request, return to "in process" status

  6. redis consumption message

  7. Read cos file and process data asynchronously

  8. Upload the error data to cos in the form of excel for users to download, and update the processing status to "processing completed"

  9. The client polls the processing status and can download the error file

  10. End

5, Achieving results

  1. Upload file
    Upload files

  2. Database import record
    Database import record

  3. Imported data
    Imported data

  4. Download error file
    Download error file

  5. Error data prompt
    Error data prompt

  6. Query import record
    Query import record

6, Code implementation

1. Import excel control layer

    @LoginRequired
    @RequestMapping(value = "doImport", method = RequestMethod.POST)
    public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
        PLUser user = getUser(request);
        return orderImportService.doImport(file, user.getId());
    }

2. service layer

    @Override    public JsonResponse doImport(MultipartFile file, Integer userId) {        if (null == file || file.isEmpty()) {            throw new ServiceException("File cannot be empty");
        }

        String filename = file.getOriginalFilename();        if (!checkFileSuffix(filename)) {            throw new ServiceException("Currently only supported xlsx Formatted excel");
        }        //Store files
        String fileId = saveToOss(file);        if (StringUtils.isBlank(fileId)) {            throw new ServiceException("File upload failed, Please try again later");
        }        //Save records to database
        saveRecordToDB(userId, fileId, filename);        //Produce an order import message
        redisProducer.produce(RedisKey.orderImportKey, fileId);        return JsonResponse.ok("Successful import, In processing...");
    }    /**
     * Verify file format
     * @param fileName
     * @return
     */
    private static boolean checkFileSuffix(String fileName) {        if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {            return false;
        }        int pointIndex = fileName.lastIndexOf(".");
        String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();        if (".xlsx".equals(suffix)) {            return true;
        }        return false;
    }   /**
     * Store files to Tencent OSS
     * @param file
     * @return
     */
    private String saveToOss(MultipartFile file) {
        InputStream ins = null;        try {
            ins = file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

        String fileId;        try {
            String originalFilename = file.getOriginalFilename();            File f = new File(originalFilename);
            inputStreamToFile(ins, f);
            FileSystemResource resource = new FileSystemResource(f);

            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }        return fileId;
    }

3. redis producers


4. redis consumer

@Servicepublic class RedisConsumer {    @Autowired
    public RedisTemplate redisTemplate;    @Value("${txOssFileUrl}")
    private String txOssFileUrl;    @Value("${txOssUploadUrl}")
    private String txOssUploadUrl;    @PostConstruct
    public void init() {
        processOrderImport();
    }    /**
     * Process order import
     */
    private void processOrderImport() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {            while (true) {
                Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);                if (null == object) {                    continue;
                }
                String msg = JSON.toJSONString(object);
                executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
            }
        });
    }

}

5. Process task thread class

public class OrderImportTask implements Runnable {
    public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {        this.msg = msg;        this.txOssFileUrl = txOssFileUrl;        this.txOssUploadUrl = txOssUploadUrl;
    }
}    /**     * Injection bean*/
    private void autowireBean() {        this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);        this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);        this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
    }    @Override
    public void run() {        //Injecting bean
        autowireBean();

        JSONObject jsonObject = JSON.parseObject(msg);        String fileId = jsonObject.getString("fileId");

        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
        param.add("id", fileId);

        ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);        String fileUrl = (String) responseResult.getData();        if (StringUtils.isBlank(fileUrl)) {            return;
        }

        InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);        List<Object> list = ExcelUtil.read(inputStream);
        process(list, fileId);
    }    /**     * Upload the file to oss * @ param file * @ return*/
    private String saveToOss(File file) {        String fileId;        try {
            FileSystemResource resource = new FileSystemResource(file);
            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }        return fileId;
    }

Note: the business logic code for data processing does not need to be pasted

6. Upload files to cos

    @RequestMapping("/txOssUpload")
    @ResponseBody    public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {        if (null == file || file.isEmpty()) {            return ResponseResult.fail("File cannot be empty");
        }        String originalFilename = file.getOriginalFilename();
        originalFilename = MimeUtility.decodeText(originalFilename);//Solve the problem of Chinese code disorder
        String contentType = getContentType(originalFilename);        String key;

        InputStream ins = null;
        File f = null;        try {
            ins = file.getInputStream();
            f = new File(originalFilename);
            inputStreamToFile(ins, f);            key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
        } catch (Exception e) {            return ResponseResult.fail(e.getMessage());
        } finally {            if (null != ins) {                try {
                    ins.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }            if (f.exists()) {//Delete temporary files
                f.delete();
            }
        }        return ResponseResult.ok(key);
    }    public static void inputStreamToFile(InputStream ins,File file) {        try {
            OutputStream os = new FileOutputStream(file);            int bytesRead = 0;            byte[] buffer = new byte[8192];            while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
                os.write(buffer, 0, bytesRead);
            }
            os.close();
            ins.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }    public String txOssUpload(FileInputStream inputStream, String key, String contentType) {        key = Uuid.getUuid() + "-" + key;
        OSSUtil.txOssUpload(inputStream, key, contentType);        try {            if (null != inputStream) {
                inputStream.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }        return key;
    }    public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
        ObjectMetadata objectMetadata = new ObjectMetadata();        try{            int length = inputStream.available();
            objectMetadata.setContentLength(length);
        }catch (Exception e){
            logger.info(e.getMessage());
        }
        objectMetadata.setContentType(contentType);
        cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
    }

7. Download File

    /**
     * Tencent cloud file download
     * @param response
     * @param id
     * @return
     */
    @RequestMapping("/txOssDownload")    public Object txOssDownload(HttpServletResponse response, String id) {
        COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);        String contentType = getContentType(id);
        FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
        return null;
    }    public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
        FileOutputStream fos = null;        response.reset();
        OutputStream os = null;
        try {            response.setContentType(contentType + "; charset=utf-8");            if(!contentType.equals(PlConstans.FileContentType.image)){
                try {                    response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
                } catch (UnsupportedEncodingException e) {                    response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
                    logger.error("encoding file name failed", e);
                }
            }

            os = response.getOutputStream();

            byte[] b = new byte[1024 * 1024];            int len;            while ((len = fileStream.read(b)) > 0) {
                os.write(b, 0, len);
                os.flush();
                try {                    if(fos != null) {
                        fos.write(b, 0, len);
                        fos.flush();
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
        } catch (IOException e) {
            IOUtils.closeQuietly(fos);
            fos = null;
        } finally {
            IOUtils.closeQuietly(os);
            IOUtils.closeQuietly(fileStream);            if(fos != null) {
                IOUtils.closeQuietly(fos);
            }
        }
    }

8. Read network file stream

    /**
     * Read network file stream
     * @param url
     * @return
     */
    public static InputStream readFileFromURL(String url) {        if (StringUtils.isBlank(url)) {            return null;
        }

        HttpClient httpClient = new DefaultHttpClient();
        HttpGet methodGet = new HttpGet(url);        try {
            HttpResponse response = httpClient.execute(methodGet);            if (response.getStatusLine().getStatusCode() == 200) {
                HttpEntity entity = response.getEntity();                return entity.getContent();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }        return null;
    }

9,ExcelUtil

    /**
     * Read excel
     * @param inputStream File input stream
     * @return list aggregate
     */
    public static List<Object> read(InputStream inputStream) {        return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
    }    /**
     * Write excel
     * @param data list data
     * @param clazz
     * @param saveFilePath File save path
     * @throws IOException
     */
    public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
        File tempFile = new File(saveFilePath);
        OutputStream out = new FileOutputStream(tempFile);
        ExcelWriter writer = EasyExcelFactory.getWriter(out);
        Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
        writer.write(data, sheet);
        writer.finish();
        out.close();
    }

Note: so far, the whole process is complete, and other knowledge point codes are pasted for reference

Seven, others

1. @ LoginRequired annotation

/**
 * Use this annotation on the method of the Controller that requires login authentication
 */@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2,MyControllerAdvice

@ControllerAdvicepublic class MyControllerAdvice {    @ResponseBody
    @ExceptionHandler(TokenValidationException.class)
    public JsonResponse tokenValidationExceptionHandler() {        return JsonResponse.loginInvalid();
    }    @ResponseBody
    @ExceptionHandler(ServiceException.class)
    public JsonResponse serviceExceptionHandler(ServiceException se) {        return JsonResponse.fail(se.getMsg());
    }    @ResponseBody
    @ExceptionHandler(Exception.class)
    public JsonResponse exceptionHandler(Exception e) {
        e.printStackTrace();        return JsonResponse.fail(e.getMessage());
    }

}

3,AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {    private static final String CURRENT_USER = "user";    @Autowired
    private UserService userService;    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {        //If it is not mapped to a method directly through
        if (!(handler instanceof HandlerMethod)) {            return true;
        }
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();        //Determine whether the interface has @ LoginRequired annotation. If yes, login is required
        LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);        if (methodAnnotation != null) {            //Verify token
            Integer userId = JwtUtil.verifyToken(request);
            PLUser plUser = userService.selectByPrimaryKey(userId);            if (null == plUser) {                throw new RuntimeException("User does not exist, please login again");
            }
            request.setAttribute(CURRENT_USER, plUser);            return true;
        }        return true;
    }    @Override
    public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }    @Override
    public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
    }
}

4,JwtUtil

    public static final long EXPIRATION_TIME = 2592_000_000L; //Valid for 30 days
    public static final String SECRET = "pl_token_secret";    public static final String HEADER = "token";    public static final String USER_ID = "userId";    /**
     * Generate token based on userId
     * @param userId
     * @return
     */
    public static String generateToken(String userId) {        HashMap<String, Object> map = new HashMap<>();        map.put(USER_ID, userId);        String jwt = Jwts.builder()
                .setClaims(map)
                .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                .signWith(SignatureAlgorithm.HS512, SECRET)
                .compact();        return jwt;
    }    /**
     * Verify token
     * @param request
     * @return userId returned after verification
     */
    public static Integer verifyToken(HttpServletRequest request) {        String token = request.getHeader(HEADER);        if (token != null) {            try {
                Map<String, Object> body = Jwts.parser()
                        .setSigningKey(SECRET)
                        .parseClaimsJws(token)
                        .getBody();                for (Map.Entry entry : body.entrySet()) {                    Object key = entry.getKey();                    Object value = entry.getValue();                    if (key.toString().equals(USER_ID)) {                        return Integer.valueOf(value.toString());// userId
                    }
                }                return null;
            } catch (Exception e) {
                logger.error(e.getMessage());                throw new TokenValidationException("unauthorized");
            }
        } else {            throw new TokenValidationException("missing token");
        }
    }


Tags: Java Excel Redis Database network

Posted on Mon, 13 Apr 2020 01:16:19 -0700 by raveman