src/jdk.incubator.adba/share/classes/jdk/incubator/sql2/Examples.java
author lancea
Sat, 20 Oct 2018 08:17:38 -0400
branchJDK-8188051-branch
changeset 56997 c9cbac2979fb
parent 56832 4f7713e6a308
permissions -rw-r--r--
JDK-8188051-branch October 20 ABDA updates

/*
 * Copyright (c)  2016, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package jdk.incubator.sql2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/**
 * Simple example code using various aspects of ADBA. These do not necessarily
 * demonstrate the best way to use each feature, just one way.
 */
public class Examples {

  // DataSourceFactory
  public DataSource getDataSource() {
    return DataSourceFactory.newFactory("oracle.database.adba")
            .builder()
            .url("//host.oracle.com:5521/example")
            .username("scott")
            .password("tiger")
            .build();
  }

  // RowCountOperation
  public void insertItem(Session session, Item item) {
    session.rowCountOperation("insert into tab values (:id, :name, :answer)")
            .set("id", item.id(), AdbaType.NUMERIC)
            .set("name", item.name(), AdbaType.VARCHAR)
            .set("answer", item.answer(), AdbaType.NUMERIC)
            .submit();
  }

  // RowOperation
  public void idsForAnswer(DataSource ds, List<Integer> result, int correctAnswer) {
    try (Session session = ds.getSession()) {
      session.<List<Integer>>rowOperation("select id, name, answer from tab where answer = :target")
              .set("target", correctAnswer, AdbaType.NUMERIC)
              .collect(() -> result,
                       (list, row) -> list.add(row.at("id").get(Integer.class)))
              .submit();
    }
  }

  // RowOperation
  public CompletionStage<List<Item>> itemsForAnswer(DataSource ds, int answer) {
    try (Session session = ds.getSession()) {
      return session.<List<Item>>rowOperation("select id, name, answer from tab where answer = :target")
              .set("target", 42, AdbaType.NUMERIC)
              .collect(Collectors.mapping(
                      row -> new Item(row.at("id").get(Integer.class),
                                      row.at("name").get(String.class),
                                      row.at("answer").get(Integer.class)),
                      Collectors.toList()))
              .submit()
              .getCompletionStage();
    }
  }

  // Independent OperationGroup
  public void insertItemsIndependent(DataSource ds, List<Item> list) {
    String sql = "insert into tab values (:id, :name, :answer)";
    try (Session session = ds.getSession();
      OperationGroup<Void, Void> group = 
        session.<Void, Void>operationGroup().independent()) {
      for (Item elem : list) {
        group.rowCountOperation(sql)
                .set("id", elem.id)
                .set("name", elem.name)
                .set("answer", elem.answer)
                .submit()
                .getCompletionStage()
                .exceptionally(t -> {
                  System.out.println(elem.id);
                  return null;
                });
      }
      group.submit();
    }
  }

  // Held OperationGroup
  public void insertItemsHold(DataSource ds, List<Item> list) {
    String sql = "insert into tabone values (:id, :name, :answer)";
    try (Session session = ds.getSession();
      OperationGroup<Void, Void> group = 
        session.<Void, Void>operationGroup().independent()) {
      
      // Submit the group before any member is submitted. Each member 
      // operation executes immediately upon submission.
      group.submit();
      for (Item elem : list) {
        group.rowCountOperation(sql)
                .set("elem_", elem)
                .submit()
                .getCompletionStage()
                .exceptionally(t -> {
                  System.out.println(elem.id);
                  return null;
                });
      }
    }
  }

  // Parallel, Independent OperationGroup
  public void updateListParallel(List<Item> list, DataSource ds) {
    String query = "select id from tab where answer = :answer";
    String update = "update tab set name = :name where id = :id";
    try (Session session = ds.getSession();
      OperationGroup<Object, Object> group = session.operationGroup()
              .independent()
              .parallel()) {
      group.submit();
      for (Item elem : list) {
        CompletionStage<Integer> idPromise = group.<List<Integer>>rowOperation(query)
                .set("answer", elem.answer, AdbaType.NUMERIC)
                .collect(Collector.of(
                        () -> new ArrayList<>(),
                        (l, row) -> l.add(row.at("id").get(Integer.class)),
                        (l, r) -> l))
                .submit()
                .getCompletionStage()
                .thenApply(l -> l.get(0));
        group.rowCountOperation(update)
                .set("id", idPromise)
                .set("name", "the ultimate question")
                .submit()
                .getCompletionStage()
                .exceptionally(t -> {
                  System.out.println(elem.id);
                  return null;
                });
      }
    }
  }

  // TransactionCompletion
  public void transaction(DataSource ds) {
    try (Session session = ds.getSession(t -> System.out.println("ERROR: " + t.toString()))) {
      TransactionCompletion trans = session.transactionCompletion();
      CompletionStage<Integer> idPromise = session.<Integer>rowOperation("select empno, ename from emp where ename = :1 for update")
              .set("1", "CLARK", AdbaType.VARCHAR)
              .collect(Collectors.collectingAndThen(
                      Collectors.mapping(r -> r.at("empno").get(Integer.class),
                                         Collectors.toList()),
                      l -> l.get(0)))
              .onError(t -> trans.setRollbackOnly())
              .submit()
              .getCompletionStage();
      session.<Long>rowCountOperation("update emp set deptno = :1 where empno = :2")
              .set("1", 50, AdbaType.INTEGER)
              .set("2", idPromise, AdbaType.INTEGER)
              .apply(c -> {
                if (c.getCount() != 1L) {
                  trans.setRollbackOnly();
                  throw new RuntimeException("updated wrong number of rows");
                }
                return c.getCount();
              })
              .onError(t -> trans.setRollbackOnly())
              .submit();
      //    .getCompletionStage()
      //    .exceptionally( t -> { trans.setRollbackOnly(); return null; } ) // incorrect
      session.catchErrors();
      session.commitMaybeRollback(trans);
    }
  }

  // RowPublisherOperation
  public CompletionStage<List<String>> rowSubscriber(DataSource ds) {

    String sql = "select empno, ename from emp";
    CompletableFuture<List<String>> result = new CompletableFuture<>();

    Flow.Subscriber<Result.RowColumn> subscriber = new Flow.Subscriber<>() {

      Flow.Subscription subscription;
      List<String> names = new ArrayList<>();
      int demand = 0;

      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(10);
        demand += 10;
      }

      @Override
      public void onNext(Result.RowColumn column) {
        names.add(column.at("ename").get(String.class));
        if (--demand < 1) {
          subscription.request(10);
          demand += 10;
        }
      }

      @Override
      public void onError(Throwable throwable) {
        result.completeExceptionally(throwable);
      }

      @Override
      public void onComplete() {
        result.complete(names);
      }

    };

    try (Session session = ds.getSession()) {
      return session.<List<String>>rowPublisherOperation(sql)
              .subscribe(subscriber, result)
              .submit()
              .getCompletionStage();
    }
  }

  // ArrayRowCountOperation
  public CompletionStage<Long> arrayInsert(DataSource ds,
                                           List<Integer> ids,
                                           List<String> names,
                                           List<Integer> answers) {
    String sql = "insert into tab values (?, ?, ?)";
    try (Session session = ds.getSession()) {
      return session.<Long>arrayRowCountOperation(sql)
              .collect(Collectors.summingLong(c -> c.getCount()))
              .set("1", ids, AdbaType.INTEGER)
              .set("2", names, AdbaType.VARCHAR)
              .set("3", answers, AdbaType.INTEGER)
              .submit()
              .getCompletionStage();
    }
  }

  // ArrayRowCountOperation -- transposed
  public CompletionStage<Long> transposedArrayInsert(DataSource ds, List<Item> items) {
    String sql = "insert into tab values (?, ?, ?)";
    try (Session session = ds.getSession()) {
      return session.<Long>arrayRowCountOperation(sql)
              .collect(Collectors.summingLong(c -> c.getCount()))
              .set("1", items.stream().map(Item::id).collect(Collectors.toList()), AdbaType.INTEGER)
              .set("2", items.stream().map(Item::name).collect(Collectors.toList()), AdbaType.VARCHAR)
              .set("3", items.stream().map(Item::answer).collect(Collectors.toList()), AdbaType.INTEGER)
              .submit()
              .getCompletionStage();
    }
  }

  // OutOperation
  public CompletionStage<Item> getItem(DataSource ds, int id) {
    String sql = "call item_for_id(:id, :name, :answer)";
    try (Session session = ds.getSession()) {
      return session.<Item>outOperation(sql)
              .set("id", id, AdbaType.INTEGER)
              .outParameter("name", AdbaType.VARCHAR)
              .outParameter("answer", AdbaType.INTEGER)
              .apply(out -> new Item(id,
                                     out.at("name").get(String.class),
                                     out.at("answer").get(Integer.class)))
              .submit()
              .getCompletionStage();
    }
  }

  // MultiOperation
  // LocalOperation
  // Control Operation Submission Rate
  public class RecordSubscriber implements Subscriber<byte[]> {

    private final Session session;
    private OperationGroup<Long, Long> group;

    public RecordSubscriber(DataSource ds) {
      session = ds.getSession();
    }

    @Override
    public void onSubscribe(Subscription subscription) {
      group = session.<Long, Long>operationGroup()
              .independent()
              .collect(Collectors.summingLong(c -> c));
      group.submit();
      session.requestHook(subscription::request);
    }

    @Override
    public void onNext(byte[] item) {
      String insert = "insert into tab values (@record)";
      group.<Long>rowCountOperation(insert)
              .set("record", item, AdbaType.VARBINARY)
              .apply(c -> c.getCount())
              .submit();
    }

    @Override
    public void onError(Throwable t) {
      group.close();
      session.close();
    }

    @Override
    public void onComplete() {
      group.close();
      session.close();
    }
  }

  // Controlling Session creation rate
  public class ItemSubscriber implements Subscriber<Item> {

    private final DataSourceFactory factory;
    private DataSource ds;

    public ItemSubscriber(DataSourceFactory f) {
      factory = f;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
      ds = factory.builder()
              .url("//host.oracle.com:5521/example")
              .username("scott")
              .password("tiger")
              .requestHook(subscription::request)
              .build();
    }

    @Override
    public void onNext(Item item) {
      try (Session s = ds.getSession()) {
        insertItem(s, item);
      }
    }

    @Override
    public void onError(Throwable t) {
      ds.close();
    }

    @Override
    public void onComplete() {
      ds.close();
    }
  }

  // SessionProperty
  public enum ExampleSessionProperty implements SessionProperty {
    LANGUAGE;

    private static final String DEFAULT_VALUE = "AMERICAN_AMERICA";

    @Override
    public Class<?> range() {
      return String.class;
    }

    @Override
    public Object defaultValue() {
      return DEFAULT_VALUE;
    }

    @Override
    public boolean isSensitive() {
      return false;
    }

    @Override
    public boolean configureOperation(OperationGroup<?, ?> group, Object value) {
      group.operation("ALTER SESSION SET NLS_LANG = "
              + group.enquoteIdentifier((String) value, false))
              .submit();
      return true;
    }

  }

  public DataSource getDataSource(DataSourceFactory factory) {
    return factory.builder()
            .registerSessionProperty(ExampleSessionProperty.LANGUAGE)
            // or .defaultSessionProperty(ExampleSessionProperty.LANGUAGE, "AMERICAN_AMERICA")
            // or .sessionProperty(ExampleSessionProperty.LANGUAGE, "FRENCHCANADIAN_CANADA")
            .build();
  }

  public Session getSession(DataSource ds) {
    return ds.builder()
            .property(ExampleSessionProperty.LANGUAGE, "FRENCH_FRANCE")
            .build()
            .attach();
  }

  // Sharding
  // TransactionOutcome
  // Column navigation
  private class Name {

    Name(String... args) {
    }
  }

  private class Address {

    Address(String... args) {
    }
  }

  private Name getName(Result.Column col) {
    return new Name(
            col.get(String.class), // title
            col.next().get(String.class), // given name
            col.next().get(String.class), // middle initial
            col.next().get(String.class), // family name
            col.next().get(String.class)); // suffix
  }

  private Address getAddress(Result.Column col) {
    List<String> a = new ArrayList<>();
    for (Result.Column c : col.slice(6)) {
      a.add(c.get(String.class));
    }
    return new Address(a.toArray(new String[0]));
  }

  @SuppressWarnings("unused")
  public void columNavigation(Result.RowColumn column) {
    Name fullName = getName(column.at("name_title"));
    Address streetAddress = getAddress(column.at("street_address_line1"));
    Address mailingAddress = getAddress(column.at("mailing_address_line1"));
    for (Result.Column c : column.at(-14)) { // dump the last 14 columns
      System.out.println("trailing column " + c.get(String.class));
    }
  }

  // Error handling
  static public class Item {

    public int id;
    public String name;
    public int answer;

    public Item(int i, String n, int a) {
      id = i;
      name = n;
      answer = a;
    }

    public int id() {
      return id;
    }

    public String name() {
      return name;
    }

    public int answer() {
      return answer;
    }
  }

}