您的位置:首页 > 博客中心 > 数据库 >

spark连接关系型数据库的几种方法

时间:2022-03-15 10:46

1.使用jdbcRDD的接口:

  1 SparkConf conf = new SparkConf();
  2         conf.setAppName("Simple Application").setMaster("local");
  3         JavaSparkContext jsc = new JavaSparkContext(conf);
  4 
  5         
  6 //1.直接使用jdbcRDD的构造函数
  7 class DbConnection extends AbstractFunction0<Connection> implements
  8         Serializable {
  9     private static final long serialVersionUID = 1L;
 10     private String driverClassName;
 11     private String connectionUrl;
 12     private String userName;
 13     private String password;
 14 
 15     public DbConnection(String driverClassName, String connectionUrl,
 16             String userName, String password) {
 17         this.driverClassName = driverClassName;
 18         this.connectionUrl = connectionUrl;
 19         this.userName = userName;
 20         this.password = password;
 21     }
 22 
 23     @Override
 24     public Connection apply() {
 25         try {
 26             Class.forName(driverClassName);
 27         } catch (ClassNotFoundException e) {
 28         }
 29         Properties properties = new Properties();
 30         properties.setProperty("user", userName);
 31         properties.setProperty("password", password);
 32         Connection connection = null;
 33         try {
 34             connection = DriverManager.getConnection(connectionUrl,
 35                     properties);
 36         } catch (SQLException e) {
 37         }
 38         return connection;
 39     }
 40 }
 41 
 42 class MapResult extends AbstractFunction1<ResultSet, Object[]>
 43         implements Serializable {
 44     private static final long serialVersionUID = 1L;
 45 
 46     public Object[] apply(ResultSet row) {
 47         return JdbcRDD.resultSetToObjectArray(row);
 48     }
 49 }
 50 
 51 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
 52 String Driver="com.mysql.jdbc.Driver";
 53 String UserName = "root";
 54 String password = "pd";
 55 DbConnection dbConnection = new DbConnection(Driver,
 56         Connection_url, UserName, password);
 57 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
 58 //lowerBound,upperBound均设置0,where条件就为恒真,这个是个处理技巧
 59 JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection,
 60         sql, 0, 0, 1, new MapResult(),
 61         ClassManifestFactory$.MODULE$.fromClass(Object[].class));
 62 JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD,
 63         ClassManifestFactory$.MODULE$.fromClass(Object[].class));
 64         
 65         
 66 //另外一种实现:
 67 class DbConnectionFactory implements JdbcRDD.ConnectionFactory {
 68     private static final long serialVersionUID = 1L;
 69     private String driverClassName;
 70     private String connectionUrl;
 71     private String userName;
 72     private String password;
 73     
 74     public Connection getConnection() throws Exception {
 75         Class.forName(driverClassName);
 76         String url = connectionUrl;
 77         Properties properties = new Properties();
 78         properties.setProperty("user", userName);
 79         properties.setProperty("password", password);
 80         return DriverManager.getConnection(url, properties);
 81     }
 82     
 83     public DbConnectionFactory(String driverClassName, String connectionUrl,
 84             String userName, String password) {
 85         this.driverClassName = driverClassName;
 86         this.connectionUrl = connectionUrl;
 87         this.userName = userName;
 88         this.password = password;
 89     }
 90 
 91 }
 92 
 93 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
 94 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
 95 DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver,
 96         Connection_url, UserName, password)
 97 javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver,
 98                     Connection_url, UserName, password), sql, 0, 0, 1,new Function<ResultSet,Object[]>()
 99             {
100                 private static final long serialVersionUID = 1L;
101                 public Object[] call(ResultSet resultSet)
102                   {
103                     return JdbcRDD.resultSetToObjectArray(resultSet);
104                   }
105             });//直接返回JavaRDD<Object[]>,这个底层调用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1)
106 //javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//该方法更加简洁,底层调用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)

 

本类排行

今日推荐

热门手游