开发者

Spring Boot集成starrocks快速入门Demo(适用场景)

开发者 https://www.devze.com 2024-08-22 10:29 出处:网络 作者: HBLOGA
目录1.什么是starrocks?适用场景​OLAP 多维分析​高并发查询​存算一体架构2.环境搭建3.代码工程实验目的pom.XMLmysql连接streamload方式4.测试MysqlClientStarRocksStreamLoad5.引用1.什么是starrocks?
目录
  • 1.什么是starrocks?
    • 适用场景​
    • OLAP 多维分析​
    • 高并发查询​
    • 存算一体架构
    • 2.环境搭建
  • 3.代码工程
    • 实验目的
    • pom.XML
    • mysql连接
    • streamload方式
  • 4.测试
    • MysqlClient
    • StarRocksStreamLoad
  • 5.引用

    1.什么是starrocks?

    StarRocks 是新一代极速全场景 MPP (Massively Parallel Processing) 数据库。StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。用户无需经过复杂的预处理,就可以用 StarRocks 来支持多种数据分析场景的极速分析。 StarRocks 架构简洁,采用了全面向量化引擎,并配备全新设计的 CBO (Cost Based Optimizer) 优化器,查询速度(尤其是多表关联查询)远超同类产品。 StarRocks 能很好地支持实时数据分析,并能实现对实时更新数据的高效查询。StarRocks 还支持现代化物化视图,进一步加速查询。 使用 StarRocks,用户可以灵活构建包括大宽表、星型模型、雪花模型在内的各类模型。 StarRocks 兼容 MySQL 协议,支持标准 SQL 语法,易于对接使用,全系统无外部依赖,高可用,易于运维管理。StarRocks 还兼容多种主流 BI 产品,包括 Tableau、Power BI、FineBI 和 Smartbi。 StarRocks 是 linux 基金会项目,采用 Apache 2.0 许可证,可在 StarRocks github 存储库中找到(请参阅 StarRocks 许可证)。StarRocks(i)链接或调用第三方软件库中的函数,其许可证可在 licenses-binary 文件夹中找到;和(ii)包含第三方软件代码,其许可证可在 licenses 文件夹中找到。

    适用场景​

    StarRocks 可以满足企业级用户的多种分析需求,包括 OLAP (Online Analytical Processing) 多维分析、定制报表、实时数据分析和 Ad-hoc 数据分析等。

    OLAP 多维分析​

    利用 StarRocks 的 MPP 框架和向量化执行引擎,用户可以灵活的选择雪花模型,星型模型,宽表模型或者预聚合模型。适用于灵活配置的多维分析报表,业务场景包括:

    用户行为分析用户画像、标签分析、圈人高维业务指标报表自助式报表平台业务问题探查分析跨主题业务分析财务报表系统监控分析 实时数据仓库​

    StarRocks 设计和实现了主键表,能够实时更新数据并极速查询,可以秒级同步 TP (Transaction Processing) 数据库的变化,构建实时数仓,业务场景包括:

    • 用户行为分析
    • 用户画像、标签分析、圈人
    • 高维业务指标报表
    • 自助式报表平台
    • 业务问题探查分析
    • 跨主题业务分析
    • 财务报表
    • 系统监控分析

    高并发查询​

    StarRocks 通过良好的数据分布特性,灵活的索引以及物化视图等特性,可以解决面向用户侧的分析场景,业务场景包括:

    • 广告主报表分析
    • 零售行业渠道人员分析
    • SaaS 行业面向用户分析报表
    • Dashboard 多页面分析

    统一分析​

    • 通过使用一套系统解决多维分析、高并发查询、预计算、实时分析查询等场景,降低系统复杂度和多技术栈开发与维护成本。
    • 使用 StarRocks 统一管理数据湖和数据仓库,将高并发和实时性要求很高的业务放在 StarRocks 中分析,也可以使用 External Catalog 和外部表进行数据湖上的分析。

    存算一体架构

    本地存储为实时查询提供了更低的查询延迟。 作为典型的大规模并行处理 (MPP) 数据库,StarRocks 支持存算一体架构。在这种架构中,BE 负责数据存储和计算。直接访问 BE 本地数据允许本地计算,避免了数据传输和复制,从而提供超快的查询和分析性能。该架构支持多副本数据存储,增强了集群处理高并发查询的能力并确保数据可靠性。非常适合追求最佳查询性能的场景。  

    Spring Boot集成starrocks快速入门Demo(适用场景)

    2.环境搭建

    采用docker搭建最简单的测试环境

    docker run -p 9030:9030 -p 8030:8030 -p 8040:8040 -itd --name quickstart starrocks/allin1-Ubuntu

    Spring Boot集成starrocks快速入门Demo(适用场景)

    3.代码工程

    实验目的

    • 测试mysql创建 修改 插入删除数据
    • 用streamload导入数据

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>springboot-demo</artifactId>
            <groupId>com.et</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
        <artifactId>starrocks</artifactId>
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-autoconfigure</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-Java</artifactId>
                <version>5.1.48</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.3</version>
           </dependency>
        </dependencies>
    </project>

    mysql连接

    /**
    Copyright (c) 2021 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreemehttp://www.devze.comnts.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
          "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
      http://www.apache.org/licenses/LICENSE-2.0
      Unless required by applicable law or agreed to in writing,
      software distributed under the License is distributed on an
      "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
      KIND, either express or implied.  See the License for the
      specific language governing permissions and limitations
      under the License.
    **/
    package com.et.starrocks.mysql;
    import java.sql.*;
    public class MysqlClient {
        public static void main(String[] args) {
            String host = "172.30.17.1";
            //query_port in fe.conf
            String port = "9030";
            String user = "root";
            //password is empty by default
            String password = "";
            //connect to starrocks
            Connection conn = null;
            try {
                conn = getConn(host, port, user, password, "");
            } catch (Exception e) {
                System.out.println("connect to starrocks failed");
                e.printStackTrace();
                return;
            }
            System.out.println("connect to starrocks successfully");
            //create statement
            Statement stmt = null;
            try {
                stmt = conn.createStatement();
            } catch (SQLException e) {
                System.out.println("create statement failed");
                e.printStackTrace();
                closeConn(conn);
                return;
            }
            System.out.println("create statement successfully");
            //create database
            try {
                stmt.execute("CREATE DATABASE IF NOT EXISTS db_test");
            } catch (SQLException e) {
                System.out.println("create database failed");
                e.printStackTrace();
                closeStmt(stmt);
                closeConn(conn);
                return;
            }
            System.out.println("create database successfully");
            //set db context
            try {
                stmt.execute("USE db_test");
            } catch (SQLException e) {
                System.out.println("set db context failed");
                e.printStackTrace();
                closeStmt(stmt);
                closeConn(conn);
                return;
            }
            System.out.println("set db context successfully");
            //create table
            try {
                stmt.execute("CREATE TABLE IF NOT EXISTS table_test(siteid INT, citycode SMALLINT, pv BIGINT SUM) " +
                        "AGGREGATE KEY(siteid, citycode) " +
                        "DISTRIBUTED BY HASH(siteid) BUCKETS 10 " +
                        "PROPERTIES(\"replication_num\" = \"1\")");
            } catch (Exception e) {
                System.out.println("create table failed");
                e.printStackTrace();
                closeStmt(stmt);
                closeConn(conn);
                return;
            }
            System.out.println("create table successfully");
            //insert data
            try {
                stmt.execute("INSERT INTO table_test values(1, 2, 3), (4, 5, 6), (1, 2, 4)");
            } catch (Exception e) {
                System.out.println("insert data failed");
                e.printStackTrace();
                closeStmt(stmt);
                closeConn(conn);
                return;
            }
            System.out.println("insert data successfully");
            //query data
            try {
                ResultSet result = stmt.executeQuery("SELECT * FROM table_test");
                System.out.println("data queried is :");
                while (result.next()) {
                    int siteid = result.getInt("siteid");
                    int citycode = result.getInt("citycode");
                    int pv = result.getInt("pv");
                    System.out.println("\t" + siteid + "\t" + citycode + "\t" + pv);
                }
            } catch (Exception e) {
                System.out.println("query data failed");
                e.printStackTrace();
                closeStmt(stmt);
                closeConn(conn);
                return;
            }
            //drop database
         /*   try {
                stmt.execute("DROP DATABASE IF EXISTS db_test");
            } catch (Exception e) {
                System.out.println("drop database failed");
                e.printStackTrace();
                closeStmt(stmt);
                closeConn(conn);
                return;
            }*/
            System.out.println("drop database successfully");
            closeStmt(stmt);
            closeConn(conn);
        }
        public static Connection getConn(String host, String port, String user, String password, String database) throws Exception {
            Class.forName("com.mysql.jdbc.Driver");
            String url = "jdbc:mysql://" + host + ":" + port + "/" + database + "?user=" + user + "&password=" + password;
            return DriverManager.getConnection(url);
        }
        public static void closeConn(Connection conn) {
            try {
                conn.close();
                System.out.println("conn closed");
            } catch (Exception e) {
                System.out.println("close conn failed");
                e.printStackTrace();
            }
        }
        public static void closeStmt(Statement stmt) {
            try {
                stmt.close(js);
                System.out.println("stmt closed");
            } catch (Exception e) {
                System.out.println("close stmt failed");
                e.printStackTrace();
            }
        }
    }

    streamload方式

    package com.et.starrocks.streamload;// Copyright (c) 2021 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
    //
    // Licensed to the Apache Software Foundation (ASF) under one
    // or more contributor license agreements.  See the NOTICE file
    // distributed with this work for additional information
    // regarding copyright ownership.  The ASF licenses this file
    // to you under the Apache License, Version 2.0 (the
    // "License"); you may not use this file except in compliance
    // with the License.  You may obtain a copy of the License at
    //
    //   http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing,
    // software distributed undphper the License is distributed on an
    // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    // KIND, either express or implied.  See the License for the
    // specific language governing permissions and limitations
    // under the License.
    import org.apache.commons.codec.binary.Base64;
    import org.apache.http.HttpHeaders;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpPut;
    import org.apache.http.entity.StringEntity;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.DefaultRedirectStrategy;
    import org.apache.http.impl.client.HttpClientBuilder;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.util.EntityUtils;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    /**
     * This class is a java demo for starrocks stream load
     *
     * The pom.xml dependency:
     *
     *         <dependency>
     *             <groupId>org.apache.httpcomponents</groupId>
     *             <artifactId>httpclient</artifactId>
     *             <version>4.5.3</version>
     *         </dependency>
     *
     * How to use:
     *
     * 1 create a table in starrocks with any mysql client
     *
     * CREATE TABLE `stream_test` (
     *   `id` bigint(20) COMMENT "",
     *   `id2` bigint(20) COMMENT "",
     *   `username` varchar(32) COMMENT ""
     * ) ENGINE=OLAP
     * DUPLICATE KEY(`id`)
     * DISTRIBUTED BY HASH(`id`) BUCKETS 20;
     *
     *
     * 2 change the StarRocks cluster, db, user config in this class
     *
     * 3 run this class, you should see the following output:
     *
     * {
     *     "TxnId": 27,
     *     "Label": "39c25a5c-7000-496e-a98e-348a264c81de",
     *     "Status": "Success",
     *     "Message": "OK",
     *     "NumberTotalRows": 10,
     *     "NumberLoadedRows": 10,
     *     "NumberFilteredRows": 0,
     *     "NumberUnselectedRows": 0,
     *     "LoadBytes": 50,
     *     "LoadTimeMs": 151
     * }
     *
     * Attention:
     *
     * 1 wrong dependency version(such as 4.4) of httpclient may cause shaded.org.apache.http.ProtocolException
     *   Caused by: shaded.org.apache.http.ProtocolException: Content-Length header already present
     *     at shaded.org.apache.http.protocol.RequestContent.process(RequestContent.java:96)
     *     at shaded.org.apache.http.protocol.ImmutableHttpProcessor.process(ImmutableHttpProcessor.java:132)
     *     at shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:182)
     *     at shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
     *     at shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
     *     at shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
     *
     *2 run this class more than once, the status code for http response is still ok, and you will see
     *  the following output:
     *
     * {
     *     "TxnId": -1,
     *     "Label": "39c25a5c-7000-496e-a98e-348a264c81de",
     *     "Status": "Label Already Exists",
     *     "ExistingJobStatus": "FINISHED",
     *     "Message": "Label [39c25a5c-7000-496e-a98e-348a264c81de"] has already been used.",
     *     "NumberTotalRows": 0,
     *     "NumberLoadedRows": 0,
     *     "NumberFilteredRows": 0,
     *     "NumberUnselectedRows": 0,
     *     "LoadBytes": 0,
     *     "LoadTimeMs": 0
     * }
     * 3 when the response statusCode is 200, that doesn't mean your stream load is ok, there may be still
     *   some stream problem unless you see the output with 'ok' message
     */
    public class StarRocksStreamLoad {
        private final static String STARROCKS_HOST = "127.0.0.1";
        private final static String STARROCKS_DB = "db_test";
        private final static String STARROCKS_TABLE = "stream_test";
        private final static String STARROCKS_USER = "root";
        private final static String STARROCKS_PASSWORD = "";
        private final static int STARROCKS_HTTP_PORT = 8040;
        private void sendData(String content) throws Exception {
            final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                    STARROCKS_HOST,
                    STARROCKS_HTTP_PORT,
                    STARROCKS_DB,
                    STARROCKS_TABLE);
            final HttpClientBuilder httpClientBuilder = HttpClients
                    .custom()
                    .setRedirectStrategy(new DefaultRedirectStrategy() {
                        @Override
                        protected boolean isRedirectable(String method) {
                            return true;
                        }
                    });
            try (CloseableHttpClient client = httpClientBuilder.build()) {
                HttpPut put = new HttpPut(loadUrl);
                StringEntity entity = new StringEntity(content, "UTF-8");
                put.setHeader(HttpHeaders.EXPECT, "100-continue");
                put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
                // the label header is optional, not necessary
      javascript          // use label header can ensure at most once semantics
                put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de1");
                put.setEntity(entity);
                try (CloseableHttpResponse response = client.execute(put)) {
                    String loadResult = "";
                    if (response.getEntity() != null) {
                        loadResult = EntityUtils.toString(response.getEntity());
                    }
                    final int statusCode = response.getStatusLine().getStatusCode();
                    // statusCode 200 just indicates that starrocks be service is ok, not stream load
                    // you should see the output content to find whether stream load is success
                    if (statusCode != 200) {
                        throw new IOException(
                                String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                    }
                    System.out.println(loadResult);
                }
            }
        }
        private String basicAuthHeader(String username, String password) {
            final String tobeEncode = username + ":" + password;
            byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
            return "Basic " + new String(encoded);
        }
        public static void main(String[] args) throws Exception {
            int id1 = 1;
            int id2 = 10;
            String id3 = http://www.devze.com"Simon";
            int rowNumber = 10;
            String oneRow = id1 + "\t" + id2 + "\t" + id3 + "\n";
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i < rowNumber; i++) {
                stringBuilder.append(oneRow);
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            String loadData = stringBuilder.toString();
            StarRocksStreamLoad starrocksStreamLoad = new StarRocksStreamLoad();
            starrocksStreamLoad.sendData(loadData);
        }
    }

    以上只是一些关键代码,所有代码请参见下面代码仓库

    代码仓库 https://github.com/Harries/springboot-demo

    4.测试

    MysqlClient

    启动main方法,可以看到执行成功

    connect to starrocks successfully
    create statement successfully
    create database successfully
    set db context successfully
    create table successfully
    insert data successfully
    data queried is :
     1 2 7
     4 5 6
    drop database successfully
    stmt closed
    conn closed

    StarRocksStreamLoad

    启动main方法,可以看到插入成功

    20:51:47.521 [main] DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection [id: 0][route: {}->http://127.0.0.1:8040] can be kept alive indefinitely
    20:51:47.521 [main] DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection released: [id: 0][route: {}->http://127.0.0.1:8040][total kept alive: 1; route allocated: 1 of 2; total allocated: 1 of 20]
    {
     "TxnId": 2,
     "Label": "39c25a5c-7000-496e-a98e-348a264c81de1",
     "Status": "Success",
     "Message": "OK",
     "NumberTotalRows": 10,
     "NumberLoadedRows": 10,
     "NumberFilteredRows": 0,
     "NumberUnselectedRows": 0,
     "LoadBytes": 109,
     "LoadTimeMs": 975,
     "BeginTxnTimeMs": 261,
     "StreamLoadPlanTimeMs": 342,
     "ReadDataTimeMs": 0,
     "WriteDataTimeMs": 106,
     "CommitAndPublishTimeMs": 259
    }

    5.引用

    Architecture | StarRocks

    Spring Boot集成starrocks快速入门Demo | Harries Blog™

    到此这篇关于Spring Boot集成starrocks快速入门Demo的文章就介绍到这了,更多相关Spring Boot集成starrocks内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号