大数据之,推送数据至

作者:计算机知识

一.工程信赖

1 渊源

 

本情形是在两台centos7的云主机上进行搭建的。

 1 <properties>
 2     <spark_version>2.3.1</spark_version>
 3     <!-- elasticsearch-->
 4     <elasticsearch.version>5.5.2</elasticsearch.version>
 5     <fastjson.version>1.2.28</fastjson.version>
 6     <elasticsearch-hadoop.version>6.3.2</elasticsearch-hadoop.version>
 7     <elasticsearch-spark.version>5.5.2</elasticsearch-spark.version>
 8 </properties>
 9 <dependencies>
10     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
11     <dependency>
12         <groupId>org.apache.spark</groupId>
13         <artifactId>spark-core_2.11</artifactId>
14         <version>${spark_version}</version>
15     </dependency>
16     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
17     <dependency>
18         <groupId>org.apache.spark</groupId>
19         <artifactId>spark-sql_2.11</artifactId>
20         <version>${spark_version}</version>
21     </dependency>
22     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
23     <dependency>
24         <groupId>org.apache.spark</groupId>
25         <artifactId>spark-yarn_2.11</artifactId>
26         <version>${spark_version}</version>
27     </dependency>
28     <dependency>
29         <groupId>org.elasticsearch</groupId>
30         <artifactId>elasticsearch-spark-20_2.11</artifactId>
31         <version>${elasticsearch-spark.version}</version>
32     </dependency>
33     <dependency>
34         <groupId>mysql</groupId>
35         <artifactId>mysql-connector-java</artifactId>
36         <version>5.1.46</version>
37     </dependency>
38 </dependencies>

  于二零一零由Matei 扎哈ria成立了spark大数目处理和计算框架,基于内部存款和储蓄器,用scala编写。

# cat syslog02.conf 
#filename:syslog02.conf #注意这个是要用#号注释掉
input{
    file{
        path => ["/var/log/*.log"]
    }
}
output{
    elasticsearch {
        hosts => ["12x.xx.15.1xx:9200"]
    }
}

大数据之,推送数据至。本子之间有依附,请查看文书档案并下载对应的版本。供给的软件包下载地址:

二.spark读取hadoop hdfs数据,并推送至elasticsearch

2 部署

翻看配置文件是不是有题目:

软件版本

 1 public class PushWordCombination {
 2 
 3     private static PinyinTool tool = new PinyinTool();
 4 
 5     public static void pushDataByLen(SparkContext sc, SparkSession sparkSession, String goodsCategory, Integer len) {
 6         Dataset<Row> goodsDF1 = sparkSession.read().format("json").json(String.format("/data/app/%s/combination%d.json", goodsCategory, len));
 7         if (goodsDF1.count() == 0) {
 8             return;
 9         }
10 
11         sparkSession.udf().register("pinYin", (String s) -> tool.toPinYin(s, "", PinyinTool.Type.LOWERCASE), DataTypes.StringType);
12 
13         Encoder<RDDKeyByCounts> nameKeyEncoder = Encoders.bean(RDDKeyByCounts.class);
14         Dataset<RDDKeyByCounts> dataset = goodsDF1.selectExpr("name as name", "counts as counts", String.format("%d as goodsCategory", 0),
15                 String.format("%d as nameLen", len), "pinYin(name) as pinYin").as(nameKeyEncoder);
16 
17         JavaEsSpark.saveToEs(dataset.javaRDD(),"goods-category/category", ImmutableMap.of("es.mapping.id", "name"));
18     }
19 
20     public static void main(String[] args) {
21         //自定义比较器
22         SparkConf conf = new SparkConf().setAppName("my-app").
23                 set("es.nodes", ESProperties.IP).
24                 set("es.port",ESProperties.PORT).
25                 set("pushdown",ESProperties.PUSH_DOWN).
26                 set("es.index.auto.create",ESProperties.INDEX_AUTO_CREATE).
27                 set("es.nodes.wan.only","true").//在这种模式下,连接器禁用发现,并且只在所有操作中通过声明的ESE节点连接,包括读和写
28                 set("es.net.http.auth.user",ESProperties.SECURITY_USER).
29                 set("es.net.http.auth.pass",ESProperties.SECURITY_PWD);
30 
31         SparkContext sc = new SparkContext(conf);
32 
33         SparkSession sparkSession = new SparkSession(sc);
34 
35         for (int j = 2; j <= 4; j  ) {
36             pushDataByLen(sc, sparkSession, "all", j);
37         }
38         sparkSession.stop();
39     }
40 }

2.1 要求软件包 下载路线见已有博文

# ../bin/logstash -f syslog02.conf -t
Sending Logstash's logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2016-12-01T09:54:46,512][FATAL][logstash.runner          ] The given configuration is invalid. Reason: Expected one of #, input, filter, output at line 1, column 1 (byte 1) after 

· java:jdk-8u91-linux-x64.tar.gz官方网站下载大概会相当慢

  Jdk ——因为运维碰到为jvm

在output->elasticsearch下扩张贰个index配置:

· scala:scala-2.11.8.tgz官方网址下载,spark2.0渴求scala二.1一.×

  Python2.7

cat syslog02.conf 
#filename:syslog02.conf
input{
    file{
        path => ["/var/log/*.log"]
    }
}
output{
    elasticsearch {
        hosts => ["123.57.15.154:9200"]
        index => "syslog02_log"
    }
}

· hadoop:hadoop-2.7.2.tar.gz镜像下载地址极快

  Scala2.10.4

重复简测配置文件:

· spark:spark-2.0.2-bin-hadoop2.7.tgz官方网站下载

  Spark1.0.1

# ../bin/logstash -f syslog02.conf -t
Sending Logstash's logs to /usr/local/logstash/logs which is now configured via log4j2.properties
Configuration OK
[2016-12-01T11:40:56,791][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

· IDE:ideaIU-2016.2.4.tar.gz中文网址下载稍快一些

本文由bwin必赢发布,转载请注明来源

关键词: bwin688 数据 Spark ELK