如何入门ApacheFlink中的Flinksink

如何入门ApacheFlink中的Flinksink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

10余年的兴和网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。营销型网站的优势是能够根据用户设备显示端的尺寸不同,自动调整兴和建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。创新互联建站从事“兴和网站设计”,“兴和网站推广”以来,每个客户项目都认真落实执行。

将DataSet中的数据Sink到哪里去。使用的是对应的OutPutFormat,也可以使用自定义的sink,有可能写到hbase中,hdfs中。

  • writeAsText() / TextOutputFormat ,以String的形式写入

  • writeAsCsv(...) / CsvOutputFormat,以CSV的方式写进去

  • print() / printToErr() / print(String msg) / printToErr(String msg)以标准输出

 writeAsText

object DataSetSinkApp {
  def main(args: Array[String]): Unit = {
    val environment = ExecutionEnvironment.getExecutionEnvironment
    val data = 1.to(10)
    val text = environment.fromCollection(data)
    val filePath = "E:/test"
    text.writeAsText(filePath)
    environment.execute("DataSetSinkApp")
  }
}

如果E:/test文件或者文件夹存在,将无法执行成功。除非增加一个WriteMode.OVERWRITE

text.writeAsText(filePath, WriteMode.OVERWRITE)

这样就在E盘下新建了一个test文件,内容是1到10。

那么如何保存到文件夹中?

text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)

设置并行度为2,这样就存到test文件夹下,两个文件1和2

默认情况下,不设置并行度,会把结果写到一个文件中,如果设置并行度,那么每一个并行度都对应一个输出。

Java

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        List info = new ArrayList<>();
        for(int i = 1;i <=10; i++) {
            info.add(i);
        }
        DataSource data1 = executionEnvironment.fromCollection(info);
        String filePath = "E:/test2";
        data1.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute("JavaDataSetSinkApp");
    }

看完上述内容,你们掌握如何入门ApacheFlink中的Flinksink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!


本文题目:如何入门ApacheFlink中的Flinksink
网页链接:http://csdahua.cn/article/gdehop.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流