package utils; import cn.hutool.core.util.RuntimeUtil; import com.alibaba.fastjson.JSON; import com.ruoyi.common.config.RuoYiConfig; import com.ruoyi.common.utils.DateUtils; import com.ruoyi.common.utils.http.HttpUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PreDestroy; import java.io.*; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; /** * rtsp 转 hlv协议 * * @author kzw */ @Service @Slf4j @EnableScheduling public class RtspConvert { //转换map private static ConcurrentHashMap coverMap = new ConcurrentHashMap<>(); //拉流命令 private static final String ffmpegCmd = "ffmpeg -i \"%s\" -c copy -f hls -hls_time 5.0 -hls_list_size 2 -hls_flags 2 %s"; //ffmpeg -i "rtsp://admin:admin123@192.168.1.40:554/stream/realtime?channel=1&streamtype=0" -c copy -f hls -hls_time 4.0 -hls_list_size 2 -hls_flags 2 "D:\video\test.m3u8" @PreDestroy public void closeProcess() { log.info("关闭ffmpeg转换进程,java程序不一定关闭process进程"); closeAllProcess(); for (String ip : coverMap.keySet()) { try { log.error("开始停止{}", ip); coverMap.get(ip).stopTask(); } catch (Exception e) { e.printStackTrace(); } } } /** * ffmpeg -i "rtsp://admin:xxx@192.168.0.251:554/Streaming/Channels/101" -c copy -f hls -hls_time 5.0 -hls_list_size 5 -hls_flags 2 F:\resources\hls\2000\live.m3u8 */ /** * 检查设备ip是否能正常访问 */ private boolean checkDeviceOnline(String ip) { String res = HttpUtils.sendGet("http://" + ip); if (StringUtils.isNotBlank(res)) { return true; } return false; } /** * 转换rtsp并获取hls文件路径 * 视频地址 http.../control-api/upload/hls/{ip}/live.m3u8 */ public String rtsp2Hls(String ip, String userName, String pwd) { System.out.println(JSON.toJSON(coverMap)); if (coverMap.containsKey(ip)) { CoverThread thread = coverMap.get(ip); if (thread == null || thread.getTaskState() != CoverThread.running) { } else { return StringUtils.replace(thread.getM3U8File(), RuoYiConfig.getProfile(), ""); } } String rtspUrl = "rtsp://" + userName + ":" + pwd + "@" + ip + ":554/stream/realtime?channel=1&streamtype=0"; String m3u8File = RuoYiConfig.getProfile() + File.separator+"upload" + File.separator + "hls" // String m3u8File = "D:" + File.separator + "ruoyi" + File.separator + "uploadPath" + File.separator + "hls" + File.separator + ip.replaceAll("\\.", "_") + File.separator + "live.m3u8"; startTransform(ip, rtspUrl, m3u8File, userName, pwd); CoverThread thread = coverMap.get(ip); if (thread != null) { return StringUtils.replace(thread.getM3U8File(), RuoYiConfig.getProfile(), ""); } return null; } /** * 开启转换 */ private void startTransform(String ip, String rtspUrl, String m3u8Path, String userName, String pwd) { log.info("转换rtsp, {},{},{}", ip, rtspUrl, m3u8Path); String memKey = "startLive" + ip; synchronized (memKey.intern()) { if (coverMap.containsKey(ip)) { stopTransform(ip); } CoverThread thread = new CoverThread(ip, rtspUrl, m3u8Path, userName, pwd); coverMap.put(ip, thread); thread.start(); } } /** * 停止转换 */ public void stopTransform(String ip) { String memKey = "startLive" + ip; synchronized (memKey.intern()) { System.out.println(JSON.toJSON(coverMap)); if (coverMap.containsKey(ip)) { CoverThread thread = coverMap.get(ip); if (thread != null && thread.getTaskState() != CoverThread.fail) { System.out.println("停止转换ip"+ip); thread.stopTask(); } } } } /** * 监控所有的转换线程 */ @Scheduled(cron = "0 0/8 * * * ?") public synchronized void monitorThreads() { for (String ip : coverMap.keySet()) { CoverThread thread = coverMap.get(ip); if (thread != null && thread.getTaskState() != CoverThread.running) { //线程出现异常 rtsp2Hls(ip, thread.getUserName(), thread.getPwd()); } } } public void closeAllProcess(){ String command = "taskkill /f /im ffmpeg.exe"; if(!isWin()){ command = "/usr/local/water-monitor-api/closeFFmpeg.sh"; } System.out.println("command ["+command+"]"); String result = RuntimeUtil.execForStr(command); System.out.println("command result ["+result+"]"); coverMap.clear(); } public boolean isWin(){ String os = System.getProperty("os.name"); return os.toLowerCase().startsWith("win"); } public class RunThread extends Thread { private InputStream is; private String printType; RunThread(InputStream is, String printType) { this.is = is; this.printType = printType; } @Override public void run() { try { InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); String line; while ((line = br.readLine()) != null) { log.info(printType + ">" + line); } } catch (IOException ioe) { log.info("RunThread error:", ioe); } } } /** * 执行命令线程 */ private class CoverThread extends Thread { private String ip; private String rtspUrl; private String m3u8File; private String userName; private String pwd; private int taskState = 0; //运行状态 0未开始 1进行中 -1失败 private static final int notStart = 0; private static final int running = 1; private static final int fail = -1; private Process process = null; CoverThread(String ip, String rtspUrl, String m3u8File, String userName, String pwd) { this.ip = ip; this.rtspUrl = rtspUrl; this.m3u8File = m3u8File; this.userName = userName; this.pwd = pwd; setName("m3u8-" + ip); this.taskState = notStart; } @Override public void run() { try { FileUtils.forceMkdir(new File(m3u8File).getParentFile()); if (!checkDeviceOnline(ip)) { log.warn("设备{},离线", ip); this.taskState = fail; return; } String command = String.format(ffmpegCmd, rtspUrl, m3u8File); this.taskState = running; //判断是操作系统是linu2x还是windows String[] comds; if (isWin()) { comds = new String[]{"cmd", "/c", command}; // comds = new String[]{"cmd", "/c", "start", "/b", "cmd.exe", "/k", command}; } else { comds = new String[]{"/bin/sh", "-c", command}; } // 开始执行命令 log.info("执行命令:" + command); process = Runtime.getRuntime().exec(comds); //开启线程监听(此处解决 waitFor() 阻塞/锁死 问题) new RunThread(process.getInputStream(), "INFO").start(); new RunThread(process.getErrorStream(), "ERROR").start(); int flag = process.waitFor(); log.info("结束{}", ip); } catch (Exception e) { log.error("出现异常" + e.getMessage(), e); this.taskState = fail; } finally { if (process != null) { try { process.exitValue(); } catch (Exception e) { } try { process.destroyForcibly(); } catch (Exception e) { } } } } /** * 获取任务执行状态 */ public int getTaskState() { return taskState; } /** * 立即停止任务 */ public void stopTask() { if (process != null) { try { process.destroy(); } catch (Exception e) { e.printStackTrace(); } } } public String getM3U8File() { return this.m3u8File; } public String getUserName() { return userName; } public String getPwd() { return pwd; } } public void openTask(){ } public static void main(String[] args) throws Exception { RtspConvert convert = new RtspConvert(); convert.closeAllProcess(); String ip = "192.168.1.40"; String userName = "admin"; String pwd = "admin123"; String m3u8 = convert.rtsp2Hls(ip, userName, pwd); System.out.println("***********************************" + m3u8); // Thread.sleep(10 * 1000); // convert.stopTransform(ip); // System.out.println("************************************结束**************"); } }