Flume自定义文件命名格式(RollFileSink)
Flume自带的本地文件Sink进行存储即RollingFileSink,其主要的官网参数如下
其中sink.rollInterval表示每个多久另起一个文件,比数据以每小时(sink.rollInterval=3600)写一个文件,那么flume就会自启动起开始每小时生成一个新文件,而且文件的名称是以时间戳的的方式命名的,非常不直观友好,比如在2019-12-01 00:00:00开始那第一个文件就是flume-1575129600000(flume-倒是可以通过sink.PathManage.prefix来自定义),这样的不能一样一眼看出文件哪个时间段的,比较常用的可以为flume-2019120100之类的方式,针对这个问题对RollingFileSink进行小小的升级,以实现可以自定义文件命名格式
升级后支持的特性
支持文件名称格式以yyyyMMddHHmmSS年月日时的进行配置
支持同一个rollInterval内重启多次,文件名不会冲突
新增参数
sink.rollInterval=3600 --原有参数不变,但是意义略有不同,原先是rollInterval是自启动程序起文件间隔切换时间。现在表示文件存储时间单位间隔数,如果想要每十分为一个间隔(即如flume-2019120110 flume-2019120120 flume-2019120130。。。),则rollInterval=600,
sink.file.name.timeFormat=yyyyMMddHH --文件的命名方式,可以精确到秒,配合rollInterval,
sink.fileMonitor=20 --监控文件名称切换的时间,(原来这个参数是rollInterval兼任这个意义),例如每20s看下当前时间所属时间段,如果变换就需要进行创建新文件
工程准备
首先自定义一个Flume-sink工程,将原有的RollingFileSink和PathManager拷贝过来,在pom中引入下面三个工程
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-sdk</artifactId><version>1.7.0</version>
</dependency>
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version>
</dependency>
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-configuration</artifactId><version>1.7.0</version>
</dependency>
代码更新如下
RollingFileSink 类,变动部分均加上了中文解释,为节省篇幅,部分较多未动的代码有所省略,已经有所注释,请注意
public class RollingFileSink extends AbstractSink implements Configurable {private static final Logger logger = LoggerFactory.getLogger(RollingFileSink.class);//........省略了类中原有变量,主要的变量定义如下private int fileMonitor;private String fileNameTimeFormat;private PathManager pathController;private long rollInterval;//新加了一个全局变量,当前文件名称private volatile String currentFile;public RollingFileSink() {pathController = new PathManager();shouldRotate = false;}public void configure(Context context) {String directory = context.getString("sink.directory");String rollInterval = context.getString("sink.rollInterval");//.......此处有省略其他参数读取/*** 新增两个参数,文件名前缀和时间格式,如果不配置时间时间则默认是时间戳的方式*/filePrefix=context.getString("sink.file.prefix","flume-");fileNameTimeFormat=context.getString("sink.file.name.timeFormat","");if (rollInterval == null) {this.rollInterval = defaultRollInterval;} else {this.rollInterval = Long.parseLong(rollInterval);}batchSize = context.getInteger("sink.batchSize", defaultBatchSize);fileMonitor=context.getInteger("sink.fileMonitor",10);this.directory = new File(directory);if (sinkCounter == null) {sinkCounter = new SinkCounter(getName());}}//启动函数public void start() {logger.info("Starting {}...", this);sinkCounter.start();super.start();pathController.setBaseDirectory(directory);pathController.setFileNameFormat(fileNameTimeFormat);pathController.setFilePrefix(filePrefix);pathController.setRollInterval((int) rollInterval/60);int fileRoll=fileMonitor;if (rollInterval > 0) {rollService = Executors.newScheduledThreadPool(1,new ThreadFactoryBuilder().setNameFormat("rollingFileSink-roller-" +Thread.currentThread().getId() + "-%d").build());//这里跟原先不同,原先这里参数直接是rollInterval,现在换成了专门的fileMonitorrollService.scheduleAtFixedRate(new Runnable() {public void run() {File file=pathController.getCurrentFile();//在这里获取当前文件名称currentFile=file.getName();shouldRotate = true;}}, fileRoll, fileRoll, TimeUnit.SECONDS);} else {logger.info("RollInterval is not valid, file rolling will not happen.");}logger.info("RollingFileSink {} started.", getName());}//数据处理函数public Sink.Status process() throws EventDeliveryException {//获取当前时间实际应该是那个最新的文件名String newestFileName=pathController.latestFile();//如果当前文件名不是当前时间段最新的文件名,则表示需要切换文件了,关闭上一个在写的文件if (shouldRotate&&(!currentFile.equals(newestFileName))) {if (outputStream != null) {logger.info("Closing file {}", pathController.getCurrentFile());try {serializer.flush();serializer.beforeClose();outputStream.close();sinkCounter.incrementConnectionClosedCount();shouldRotate = false;} catch (IOException e) {sinkCounter.incrementConnectionFailedCount();throw new EventDeliveryException("Unable to rotate file "+ pathController.getCurrentFile() + " while delivering event", e);} finally {serializer = null;outputStream = null;}pathController.rotate();}}//打开新的文件if (outputStream == null) {File currentFile = pathController.getCurrentFile();logger.info("Opening output stream for file {}", currentFile);try {outputStream = new BufferedOutputStream(new FileOutputStream(currentFile));serializer = EventSerializerFactory.getInstance(serializerType, serializerContext, outputStream);serializer.afterCreate();sinkCounter.incrementConnectionCreatedCount();} catch (IOException e) {sinkCounter.incrementConnectionFailedCount();throw new EventDeliveryException("Failed to open file "+ pathController.getCurrentFile() + " while delivering event", e);}}Channel channel = getChannel();Transaction transaction = channel.getTransaction();Event event = null;Sink.Status result = Sink.Status.READY;String content;try {/ .........此处代码跟原先一样,为节省篇幅,此处省略} catch (Exception ex) {transaction.rollback();throw new EventDeliveryException("Failed to process transaction", ex);} finally {transaction.close();}retrn result;}@Overridepublic void stop() {//...stop()函数不变,此处代码跟原先一样,为节省篇幅,此处省略}//...省略其他set方法
}
PathManager 类
public class PathManager {private static final Logger logger = LoggerFactory.getLogger(PathManager.class);private File baseDirectory;private AtomicInteger fileIndex;private File currentFile;private String filePrefix="";private String fileNameTimeFormat="";public void setRollInterval(int rollInterval) {this.rollInterval = rollInterval;}private int rollInterval;public PathManager() {fileIndex = new AtomicInteger();}/*** 自定义文件命名格式,以时间格式如yyyyMMddHH来进行文件命名,而不是时间戳的方式* 将原先方法进行修改如下* @return*/public File nextFile() {String fileName;if(!fileNameTimeFormat.equals("")){SimpleDateFormat sdf=new SimpleDateFormat(fileNameTimeFormat);String startPrefix=sdf.format(new Date());Calendar calendar=Calendar.getInstance();int startMinute=calendar.get(Calendar.MINUTE);int startTime=(startMinute/rollInterval)*rollInterval;startPrefix=startTime<10?startPrefix+"0"+startTime:startPrefix+startTime;fileName= filePrefix+startPrefix;}else{fileName=filePrefix+getSeriesTimestamp()+"-"+fileIndex.incrementAndGet();}currentFile = new File(baseDirectory, fileName);//如果已经存在文件,则将已有文件重命名,一般用在一个时间内多次重复服务的情况if(currentFile.exists()){String newName=fileName+"-"+getRandom();currentFile.renameTo(new File(baseDirectory,newName));currentFile = new File(baseDirectory, fileName);}return currentFile;}/*** 获取当前时间点是该是格式的文件名称* @return*/public String latestFile(){SimpleDateFormat sdf=new SimpleDateFormat(fileNameTimeFormat);String startPrefix=sdf.format(new Date());Calendar calendar=Calendar.getInstance();int startMinute=calendar.get(Calendar.MINUTE);int startTime=(startMinute/rollInterval)*rollInterval;//以每个小时0为起点,rollInterval一个区间,如rollInterval=15则,文件为2019120100,2019120115、2019120130、2019120145startPrefix=startTime<10?startPrefix+"0"+startTime:startPrefix+startTime;String fileName= filePrefix+startPrefix;return fileName;}public File getCurrentFile() {if (currentFile == null) {return nextFile();}return currentFile;}public void rotate() {currentFile = null;}public File getBaseDirectory() {return baseDirectory;}public void setBaseDirectory(File baseDirectory) {this.baseDirectory = baseDirectory;}private long getSeriesTimestamp() {return System.currentTimeMillis();}public void setFilePrefix(String prefix){filePrefix=prefix;}public void setFileNameFormat(String patten){fileNameTimeFormat=patten;}public AtomicInteger getFileIndex() {return fileIndex;}//获取一个随机值public String getRandom(){return String.valueOf((int)(1+Math.random()*(1000)));}
}
Flume自定义文件命名格式(RollFileSink)相关推荐
- java 下载db文件_Java下载文件自定义名称和格式类型
response.setContentType()的作用是使客户端浏览器,区分不同种类的数据,并根据不同的MIME调用浏览器内不同的程序嵌入模块来处理相应的数据,可以设置文件格式.参考数据如下: re ...
- 如何批量给pdf文件命名?
如何批量给pdf文件命名?不知道大家有没有发现,现在很多的文件都是pdf格式的,不管客户发给你的或者你要发给客户的文件,我们都会使用pdf格式,有时候可能因为工作的特殊性,可能会产生很多的pdf文件, ...
- Hadoop控制输出文件命名
参考:http://blog.csdn.net/zuochanxiaoheshang/article/details/8769198 Hadoop 控制输出文件命名 在一般情况下,Hadoop 每一个 ...
- Eclipse - CDT使用GDB调试C++的问题-无源文件命名(No source file named)
CDT使用GDB调试C++的问题(No source file named) 本文地址: http://blog.csdn.net/caroline_wendy/article/details/170 ...
- 如何使 FlashGet 正常合法 下载 Session 中的自定义文件链接呢? JSP/Servlet 实现!
<% //============================================================================================ ...
- 询问HTG:白噪声屏幕保护程序,有效的文件命名以及从密码泄露中恢复
Once a week we share three of the questions we've answered from the Ask HTG inbox with the greater r ...
- 数字电视制播设备间的文件交换格式
在现今的数字电视演播室中,设备之间基本上采用信号流连接方式,如SDI.STDI.模拟YUV.VBS等信号流.在非线性编辑系统和播出系统与服务器之间的连接,还有基于MPEG-2传输流等的信号连接方式.基 ...
- 简述计算机文件的命名办法,如何进行文件命名-如何进行文件管理
如何进行文件命名-如何进行文件管理 电脑的管理事实上就是文件和文件夹的管理.想要我们的电脑干净整齐,就需要我们正确的进行文件管理.我们知道了文件和文件夹的概念之后,现在我们再来看看单独的文件,认识一下 ...
- c语言头文件格式图片_c语言中的.h头文件的格式
#ifndef __digital_h__ #define __digital_h__ #ifndef __cplusplus extern "C"{ #endif #ifdef ...
- php类文件命名规则,phpcms文件命名规则
phpcms文件命名规则 以下是model的目录(M) 文件名必须以_model.class.php 类名要和文件名相同并且继承model 如:文件名为aa_model.class.php 文件内容: ...
最新文章
- Present ViewController详解
- 在飞塔防火墙上实现IPSec ×××
- C六:指针可以比较大小
- Poisson Image Editing 泊松融合 matlab代码完整
- windows apache html5,Windows服务器下的IIS和Apache性能比较
- 取消XP和2000专业版的IIS的用户数量限制
- 运行android程序时显示stop,Android系统.应用程序关闭时WorkManager是否正在运行?
- golang 排序_常用排序算法之冒泡排序
- Python getattr
- ERP软件管理持续升级 抢占产业先机
- 田彩蝶(帮别人名字作诗)
- 【转载】回顾中国股市十年市盈率,我们还看得见希望吗?
- 在plc中用c语言实现电梯控制程序,基于三菱FX2N PLC的两部电梯控制系统设计(附梯形图程序)...
- java ts视频文件转mp4格式在线求助
- 如何使用Java计算闰年?
- Java 水印操作的设计与实现
- Echarts 双柱状图+折线图合并---实现效果详解(vue+Echarts实现)
- Android更换logo之后小米手机和部分三星手机通知栏的小图标未显示正确的图标问题
- 移动端点击出现遮罩块效果
- mysql5.7.10 64_mysql5.7.10win764安装