Stories

Detail Return Return

GeaFlow任務能力增強:通過API定製流圖計算邏輯 - Stories Detail

GeaFlow API介紹

GeaFlow API是對高階用户提供的開發接口,用户可以直接通過編寫java代碼來編寫計算作業,相比於DSL,API的方式開發更加靈活,也能實現更豐富的功能和更復雜的計算邏輯。
在GeaFlow中,API支持Graph API和Stream API兩種類型:

  • Graph API:Graph是GeaFlow框架的一等公民,當前GeaFlow框架提供了一套基於GraphView的圖計算編程接口,包含圖構建、圖計算及遍歷。在GeaFlow中支持Static Graph和Dynamic Graph兩種類型。

    • Static Graph API:靜態圖計算API,基於該類API可以進行全量的圖計算或圖遍歷。
    • Dynamic Graph API:動態圖計算API,GeaFlow中GraphView是動態圖的數據抽象,基於GraphView之上,可以進行動態圖計算或圖遍歷。同時支持對Graphview生成Snapshot快照,基於Snapshot可以提供和Static Graph API一樣的接口能力。

  • Stream API:GeaFlow提供了一套通用計算的編程接口,包括source構建、流批計算及sink輸出。在GeaFlow中支持Batch和Stream兩種類型。

    • Batch API:批計算API,基於該類API可以進行批量計算。
    • Stream API:流計算API,GeaFlow中StreamView是動態流的數據抽象,基於StreamView之上,可以進行流計算。

更多API的介紹可參考 https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/api/overview.md

PageRank算法示例

本例子是從文件中讀取點邊進行構圖,執行pageRank算法後,將每個點的pageRank值進行打印。
其中,用户需要實現AbstractVcFunc,在compute方法中進行每一輪迭代的計算邏輯。
在本例子中,只計算了兩輪迭代的結果。在第一輪中,每個點都會向鄰居點發送當前點的value值,而在第二輪中,每個點收到鄰居點發送的消息,將其value值進行累加,並更新為自己的value值,即為最後的PageRank值。

public class PageRank {

    private static final Logger LOGGER = LoggerFactory.getLogger(PageRank.class);

    public static final String RESULT_FILE_PATH = "./target/tmp/data/result/pagerank";

    private static final double alpha = 0.85;

    public static void main(String[] args) {
        Environment environment = EnvironmentUtil.loadEnvironment(args);
        IPipelineResult result = PageRank.submit(environment);
        PipelineResultCollect.get(result);
        environment.shutdown();
    }

    public static IPipelineResult submit(Environment environment) {
        Pipeline pipeline = PipelineFactory.buildPipeline(environment);
        Configuration envConfig = environment.getEnvironmentContext().getConfig();
        envConfig.put(FileSink.OUTPUT_DIR, RESULT_FILE_PATH);
        ResultValidator.cleanResult(RESULT_FILE_PATH);

        pipeline.submit((PipelineTask) pipelineTaskCxt -> {
            Configuration conf = pipelineTaskCxt.getConfig();
            PWindowSource<IVertex<Integer, Double>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",
                        line -> {
                            String[] fields = line.split(",");
                            IVertex<Integer, Double> vertex = new ValueVertex<>(
                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));
                            return Collections.singletonList(vertex);
                        }), AllWindow.getInstance())
                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

            PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge",
                    line -> {
                        String[] fields = line.split(",");
                        IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                        return Collections.singletonList(edge);
                    }), AllWindow.getInstance())
                .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

            int iterationParallelism = conf.getInteger(ExampleConfigKeys.ITERATOR_PARALLELISM);
            GraphViewDesc graphViewDesc = GraphViewBuilder
                .createGraphView(GraphViewBuilder.DEFAULT_GRAPH)
                .withShardNum(2)
                .withBackend(BackendType.Memory)
                .build();
            PGraphWindow<Integer, Double, Integer> graphWindow =
                pipelineTaskCxt.buildWindowStreamGraph(prVertices, prEdges, graphViewDesc);

            SinkFunction<IVertex<Integer, Double>> sink = ExampleSinkFunctionFactory.getSinkFunction(conf);
            graphWindow.compute(new PRAlgorithms(10))
                .compute(iterationParallelism)
                .getVertices()
                .sink(v -> {
                    LOGGER.info("result {}", v);
                })
                .withParallelism(conf.getInteger(ExampleConfigKeys.SINK_PARALLELISM));
        });

        return pipeline.execute();
    }

    public static class PRAlgorithms extends VertexCentricCompute<Integer, Double, Integer, Double> {

        public PRAlgorithms(long iterations) {
            super(iterations);
        }

        @Override
        public VertexCentricComputeFunction<Integer, Double, Integer, Double> getComputeFunction() {
            return new PRVertexCentricComputeFunction();
        }

        @Override
        public VertexCentricCombineFunction<Double> getCombineFunction() {
            return null;
        }

    }

    public static class PRVertexCentricComputeFunction extends AbstractVcFunc<Integer, Double, Integer, Double> {

        @Override
        public void compute(Integer vertexId,
                            Iterator<Double> messageIterator) {
            IVertex<Integer, Double> vertex = this.context.vertex().get();
            List<IEdge<Integer, Integer>> outEdges = context.edges().getOutEdges();
            if (this.context.getIterationId() == 1) {
                if (!outEdges.isEmpty()) {
                    this.context.sendMessageToNeighbors(vertex.getValue() / outEdges.size());
                }

            } else {
                double sum = 0;
                while (messageIterator.hasNext()) {
                    double value = messageIterator.next();
                    sum += value;
                }
                double pr = sum * alpha + (1 - alpha);
                this.context.setNewVertexValue(pr);

                if (!outEdges.isEmpty()) {
                    this.context.sendMessageToNeighbors(pr / outEdges.size());
                }
            }
        }

    }
}

提交API作業

(以容器模式,PageRank算法示例)

算法打包

在新的項目中新建一個PageRank的demo,pom中引入geaflow依賴

<dependency>
    <groupId>com.antgroup.tugraph</groupId>
    <artifactId>geaflow-assembly</artifactId>
    <version>0.2-SNAPSHOT</version>
</dependency>

新建PageRank類,編寫上述相關代碼。
在項目resources路徑下,創建測試數據文件email_vertex和email_edge,代碼中會從resources://資源路徑讀取數據進行構圖。

 PWindowSource<IVertex<Integer, Double>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",
                        line -> {
                            String[] fields = line.split(",");
                            IVertex<Integer, Double> vertex = new ValueVertex<>(
                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));
                            return Collections.singletonList(vertex);
                        }), AllWindow.getInstance())
                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

   PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge",
                    line -> {
                        String[] fields = line.split(",");
                        IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                        return Collections.singletonList(edge);
                    }), AllWindow.getInstance())
                .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

email_vertex

0,1
1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1

email_edge

4,3
0,1
2,3
4,6
2,4
6,8
0,2
4,8
0,5
0,7
0,8
9,0
7,0
7,1
7,2
9,5
3,0
7,4
5,3
7,5
1,0
5,4
9,8
3,4
7,9
3,7
3,8
1,6
8,0
6,0
6,2
8,5
4,2

maven打包,在target目錄獲取算法的jar包

mvn clean install

新增HLA圖任務

在GeaFlow Console中新增圖任務,任務類型選擇“HLA”, 並上傳jar包(或者選擇已存在的jar包),其中entryClass為算法主函數所在的類。 點擊“提交”,創建任務。

提交作業


點擊"發佈",可進入作業詳情界面,點擊“提交”即可提交作業。

查看運行結果

進入容器 /tmp/logs/task/ 目錄下,查看對應作業的日誌,可看到日誌中打印了最終計算得到的每個點的pageRank值。

2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:0, value:1.5718675107490019)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:1, value:0.5176947080197076)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:2, value:1.0201253300467092)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:3, value:1.3753756869824914)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:4, value:1.4583114077692536)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:5, value:1.1341668910561529)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:6, value:0.6798184364673463)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:7, value:0.70935427506243)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:8, value:1.2827529511906106)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:9, value:0.2505328026562969)

可在作業詳情中查看運行詳情,

至此,我們就成功使用Geaflow實現並運行API任務了!是不是超簡單!快來試一試吧!

GeaFlow(品牌名TuGraph-Analytics) 已正式開源,歡迎大家關注!!!
歡迎給我們 Star 哦! GitHub👉 https://github.com/TuGraph-family/tugraph-analytics
更多精彩內容,關注我們的博客 https://geaflow.github.io/

user avatar xzqcsj Avatar RCJL Avatar databend Avatar ligaai Avatar shumile_5f6954c414184 Avatar eolink Avatar yian Avatar huaweichenai Avatar aipaobudeshoutao Avatar dalideshoushudao Avatar wnhyang Avatar aitibao_shichangyingxiao Avatar
Favorites 72 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.