본문 바로가기

카테고리 없음

sqlalchemy -> ORM -- core단

 

 

 

 

orm 은 실제 데이터를 저장하고 관리하는데에는 사용하지 않을 것. -> 느림

그럼 언제 사용? -> 기능적인 부분을 개발(서비스 개발)할 때 사용하자!

sqlalchemy를 사용할건데, core 단에서 어떤 일이 벌어지는지 먼저 알아야 함.

Core : RDBMS 통합 관리/연결/

 

필요한 부분 import  -  sqlalchemy 2.0 기준

import sqlalchemy
 
from sqlalchemy import create_engine, MetaData
from sqlalchemy.schema import Table, Column, ForeignKey
from sqlalchemy.types import Integer, Text
from sqlalchemy.sql import Select, insert, update, delete, join, func

ORM -> SQLAlchemy 

[Core] 

engine  -    Dialect                 -   DB API  - DBMS

             -    Connection Pool  (Class -> obj -> SQL ->)

 

 

engine= create_engine ('sqlite:///:memory:', echo = True)     #엔진생성
 
# cur.execute/many/
con  = engine.connect()                                                          # 연결
 
meta = MetaData()                                                                  #메타데이터 만들기
 
meta.tables
 
Table('USER', meta,                                                               #USER 테이블 생성
      Column('PK', Integer, primary_key=True),
      Column('NAME', Text, nullable= False))
 
 
User = meta.tables['USER']                #SQLAlchemy에서 데이터베이스 테이블에 대한 메타데이터 정보를 활용하여
                                                            해당 테이블에 대한 파이썬 객체를 생성
 
 
print(User)                                                      # 테이블 명 출력
 
 
print(User.insert())
 
 
meta.create_all(engine)                                  #이제서야 lazy connectiong에 의해서 테이블을 만듦.
 
 
meta.drop_all(engine)                                     # 테이블 날림
 
 
meta.remove(User)                                         #테이블도 삭제됨.
                                                          # drop_all 안 하고 이것만 수행하면, db에는 테이블이 남아있고, 객체는 사라진 상태.
 
meta.tables                                                    # 테이블이 남아있지 않음
 
meta.reflect(engine)                      #데이터베이스 스키마를 동적으로 검사하고
                                                        데이터베이스의 테이블, 컬럼, 제약 조건 등과 관련된 정보를 가져옴
 

 

다시 만들자. 

Table('USER', meta,
      Column('PK', Integer, primary_key=True),
      Column('NAME', Text, nullable= False))
 
 
 
User = meta.tables['USER']
 
print(User)                                                               # 테이블 명 출력
 
 
print(User.insert())
 
 
meta.create_all(engine)                                             #이제서야 lazy connectiong에 의해서 테이블을 만듦.
                                          # 지정된 engine과 관련된 데이터베이스에 메타데이터 객체 meta에서 정의한 테이블들을 생성
 
print(User.insert().values(PK=1, NAME = '아무개'))                        # TABLE에 데이터 넣기 
>>> INSERT INTO "USER" ("PK", "NAME") VALUES (:PK, :NAME)
 
print(User.insert().values(PK=1, NAME = '아무개').compile().params)
>>> {'PK': 1, 'NAME': '아무개'}
 
con.execute(User.insert().values(PK=1, NAME = '아무개'))
 
con.execute(User.select()).fetchall()
>>>[(1, '아무개')]

 

새로운 테이블 만들자

Table('ADDRESS', meta,                                              #ADDRESS TABLE 생성
      Column('PK', Integer, primary_key=True), 
      Column('NAME', Text),
      Column('FK', Integer, ForeignKey('USER.PK'))
      )
 
User.columns['PK']
>>> Column('PK', Integer(), table=<USER>, primary_key=True, nullable=False)
 
meta.create_all(engine)                                                #밑 설명을 읽으면, commit을 했으니, db에 반영되었을 것임.
 
Table('TEMP', meta,                                                      #TEMP TABLE 생성
      Column('PK', Integer, primary_key=True),
      Column('NAME', Text),
      Column('FK', Integer, nullable=False)
      )
 
User.columns['PK']
>>>Column('PK', Integer(), table=<USER>, primary_key=True, nullable=False)

 

테이블에 데이터를 넣어보자. USER / ADDRESS / TEMP  총 3개의 테이블이 생성된 걸 확인할 수 있다. 

meta.tables
 
>>>FacadeDict({'USER': Table('USER', MetaData(), Column('PK', Integer(), table=<USER>, primary_key=True, nullable=False), Column('NAME', Text(), table=<USER>, nullable=False), schema=None), 'ADDRESS': Table('ADDRESS', MetaData(), Column('PK', Integer(), table=<ADDRESS>, primary_key=True, nullable=False), Column('NAME', Text(), table=<ADDRESS>), Column('FK', Integer(), ForeignKey('USER.PK'), table=<ADDRESS>), schema=None), 'TEMP': Table('TEMP', MetaData(), Column('PK', Integer(), table=<TEMP>, primary_key=True, nullable=False), Column('NAME', Text(), table=<TEMP>), Column('FK', Integer(), table=<TEMP>, nullable=False), schema=None)})
con.execute(meta.tables['ADDRESS'].insert().values(PK=1, NAME='주소', FK=1))
con.execute(meta.tables['TEMP'].insert().values(PK=1, NAME='가짜', FK=1))

 

여기까지 현재 상태

 

ORM                                                         DBMS

MetaData      

T.USER                                                      USER

T.ADDRESS                                               ADRESS

T.TEMP                                                       TEMP

meta.drop_all(engine) ; ORM ->DBMS

meta.remove(T객체) ; ORM Table 객체

meta.reflect(engine) ; DBMS -> ORM

 

JOIN 

 

각 테이블에서 정보 가져오기

con.execute(Select(meta.tables['USER'],
                   meta.tables['ADDRESS'],
                   meta.tables['TEMP'])).fetchall()
 
>>> [(1, '아무개', 1, '주소', 1, 1, '가짜', 1)]         # 지금까지 테이블에 넣었던 정보들 다 가져옴. 

user 테이블의 pk, user 테이블의 name, temp 테이블의 pk 가져오기

con.execute(Select(meta.tables['USER'].columns['PK'],
                   meta.tables['USER'].c.NAME,
                   meta.tables['TEMP'].c.PK)).fetchall()
 
## 컬럼이 여러 테이블에 중복된 이름으로 정의되어 있거나, SQL 질의에서 컬럼을 명확하게 식별해야 할 때 충돌을 방지하기 위해 'c'를 사용하여 명시적으로 컬럼을 지정
 
>>> [(1, '아무개', 1)]

 

meta.tables를 반복적으로 쓰는 대신 사용하기 쉽게 이름 한 번 정의해주고 

USER = meta.tables['USER']
ADDR = meta.tables['ADDRESS']
TEMP = meta.tables['TEMP']

USER 테이블에서 PK 컬럼이 1인 행을 선택. 

print(USER.select().where(USER.c.PK == 1))          # 'USER' 테이블에서 'PK' 컬럼이 1인 행을 선택
 
>>> SELECT "USER"."PK", "USER"."NAME"
FROM "USER"
WHERE "USER"."PK" = :PK_1

on 절을 만들어준건가?

print(USER.c.PK == TEMP.c.FK)
 
>>> "USER"."PK" = "TEMP"."FK"

ADDRESS 테이블이 USER 테이블의 pk를 참고하고 있기 때문에 join이 잘되는건가? 

print(USER.join(ADDR))
 
>>> "USER" JOIN "ADDRESS" ON "USER"."PK" = "ADDRESS"."FK"

 

print(USER.join(TEMP))                                                         # 에러가 남. ON절이 없기 때문.

 

print(USER.join(TEMP, USER.c.PK == TEMP.c.FK))              #ON 절이 만들어 주면 join이 잘 실행 됨. 
 
 
>>> "USER" JOIN "TEMP" ON "USER"."PK" = "TEMP"."FK"

 

print(Select().select_from(USER.join(ADDR)))
 
>>> SELECT
FROM "USER" JOIN "ADDRESS" ON "USER"."PK" = "ADDRESS"."FK"
print(Select(USER).join(ADDR))
 
>>> SELECT "USER"."PK", "USER"."NAME"
FROM "USER" JOIN "ADDRESS" ON "USER"."PK" = "ADDRESS"."FK"
or_  / and_
from sqlalchemy.sql import or_, and_, between
print(or_(USER.c.PK == 1, USER.c.PK ==2))
 
>>>"USER"."PK" = :PK_1 OR "USER"."PK" = :PK_2
print(USER.c.NAME.like('%어쩌고저쩌고%'))
 
>>>"USER"."NAME" LIKE :NAME_1
print(USER.select().where(USER.c.NAME.like('%어쩌고저쩌고%')).order_by(USER.c.PK))
 
>>>SELECT "USER"."PK", "USER"."NAME"
FROM "USER"
WHERE "USER"."NAME" LIKE :NAME_1
ORDER BY "USER"."PK"
print((USER.c.NAME=='abcd') & (USER.c.PK==1)|(USER.c.PK==2))
 
>>> "USER"."NAME" = :NAME_1 AND "USER"."PK" = :PK_1 OR "USER"."PK" = :PK_2

 

 

 

예제 만들어보자. ppt-144


 


# 이전 기록 삭제 

engine.dispose()
meta.clear()

 

# 메타데이터 새로 생성 

meta = MetaData()
 
len(meta.tables), meta.tables
 
engine = create_engine('sqlite:///:memory:', echo=True)
meta.create_all(engine)
con = engine.connect()

 

테이블 만들기  (에러 나면 테이블 한 개씩 해보자)

#아티스트
Table('T_A', meta, Column('PK', Integer, primary_key=True),
      Column('NAME', Text, nullable=False))
#앨범
Table('T_B', meta, Column('PK', Integer, primary_key=True),
      Column('NAME', Text, nullable=False),
      Column('FK', Integer, ForeignKey('T_A.PK')))
#장르
Table('T_C', meta, Column('PK', Integer, primary_key=True),
      Column('NAME', Text, nullable=False))
#곡
Table('T_D', meta, Column('PK', Integer, primary_key=True),
      Column('NAME', Text, nullable=False),
      Column('FK1', Integer, ForeignKey('T_B.PK'), nullable=False),
      Column('FK2', Integer, ForeignKey('T_B.PK'), nullable=False))
len(meta.tables), meta.tables
 
>>> (4, FacadeDict({'T_A': Table('T_A', MetaData(), Column('PK', Integer(), table=<T_A>, primary_key=True, nullable=False), Column('NAME', Text(), table=<T_A>, nullable=False), schema=None), 'T_B': Table('T_B', MetaData(), Column('PK', Integer(), table=<T_B>, primary_key=True, nullable=False), Column('NAME', Text(), table=<T_B>, nullable=False), Column('FK', Integer(), ForeignKey('T_A.PK'), table=<T_B>), schema=None), 'T_C': Table('T_C', MetaData(), Column('PK', Integer(), table=<T_C>, primary_key=True, nullable=False), Column('NAME', Text(), table=<T_C>, nullable=False), schema=None), 'T_D': Table('T_D', MetaData(), Column('PK', Integer(), table=<T_D>, primary_key=True, nullable=False), Column('NAME', Text(), table=<T_D>, nullable=False), Column('FK1', Integer(), ForeignKey('T_B.PK'), table=<T_D>, nullable=False), Column('FK2', Integer(), ForeignKey('T_B.PK'), table=<T_D>, nullable=False), schema=None)}))
 
meta.create_all(engine)

 

 

 

for name in ['가수1', '가수2', '가수3','가수4']:
    con.execute(meta.tables['T_A'].insert().values(NAME=name))
   
#  => insert into T_A (NAME) VALUES(:name)

사용하기 편하게 이름 바꿔주

A = meta.tables['T_A']
B = meta.tables['T_B']
C = meta.tables['T_C']
D = meta.tables['T_D']

키워드에 따라 검색할 수 있도록 만들자

key = '수1'
con.execute(Select(A.c.PK).where(A.c.NAME.like('%'+key))).fetchone()
 
>>>(1,)

 

 

for key in ['1','2','3','4']:                                                 #INSERT INTO T_B(NAME, FK) VALUES(:name, :fk)  같은 로직. 
    for name in ['앨범1', '앨범2']:
        fk = con.execute(select(A.c.PK).where(
            A.c.NAME.like('%'+key))).fetchone()[0]                

        con.execute(B.insert().values(
            NAME=f'가수{key}_{name}', FK=fk))
 

-> 결과값이 아주 길게 나옴, 

 

잘 되었는지 확인

con.execute(select(A.c.NAME, B.c.NAME).join(B)).fetchall()
 
>>> 
[('가수1', '가수1_앨범1'),
('가수1', '가수1_앨범2'),
('가수2', '가수2_앨범1'),
('가수2', '가수2_앨범2'),
('가수3', '가수3_앨범1'),
('가수3', '가수3_앨범2'),
('가수4', '가수4_앨범1'),
('가수4', '가수4_앨범2')]

 

for name in ['장르1', '장르2', '장르4','장르4']:
    con.execute(meta.tables['T_C'].insert().values(NAME=name))
for row in con.execute(select(B.c.PK)).fetchall():
    # 1,2,3,4,5... FK1
    for key in ['1','2','3','4']:
        for name in ['노래1', '노래2', '노래3', '노래4']:
            fk = con.execute(select(C.c.PK).where(
                A.c.NAME.like('%'+key))).fetchone()[0]

            con.execute(D.insert().values(
                NAME=f'{row[0]}앨범_장르{key}_{name}',
                FK1=row[0], FK2=fk))
            # INSERT INTO T_D(NAME, FK1, FK2)
            # VALUES(:name, :row[0], fk)

-> 이것도 결과값이 아주 길게 나와야 함.

con.execute(Select(func.count(A.c.PK))).fetchall(),\
con.execute(Select(func.count(B.c.PK))).fetchall(),\
con.execute(Select(func.count(C.c.PK))).fetchall(),\
con.execute(Select(func.count(D.c.PK))).fetchall()

 

join

아래 3개의 그림 결과가 다 같게 나오는 것을 확인하자. 

print(select(A.c.NAME, B.c.NAME).where(A.c.PK==B.c.FK))
 
>>> 
SELECT "T_A"."NAME", "T_B"."NAME" AS "NAME_1"
FROM "T_A", "T_B"
WHERE "T_A"."PK" = "T_B"."FK"

 

print(select(A.c.NAME, B.c.NAME).join(B))
print(select(A.c.NAME, B.c.NAME).join(B, A.c.PK==B.c.FK))
 
>>> 
SELECT "T_A"."NAME", "T_B"."NAME" AS "NAME_1"
FROM "T_A" JOIN "T_B" ON "T_A"."PK" = "T_B"."FK"
SELECT "T_A"."NAME", "T_B"."NAME" AS "NAME_1"
FROM "T_A" JOIN "T_B" ON "T_A"."PK" = "T_B"."FK"

 

print(select(A.c.NAME, B.c.NAME).select_from(A.join(B)))
 
>>> SELECT "T_A"."NAME", "T_B"."NAME" AS "NAME_1"
FROM "T_A" JOIN "T_B" ON "T_A"."PK" = "T_B"."FK"

 

 

 

 

다양한 방식으로 결과 도출해보자 

 

con.execute(select(A.c.NAME, B.c.NAME).\
            select_from(A.join(B)).\
            order_by(B.c.PK)).fetchall()
 
>>>
[('가수1', '가수1_앨범1'),
('가수1', '가수1_앨범2'),
('가수2', '가수2_앨범1'),
('가수2', '가수2_앨범2'),
('가수3', '가수3_앨범1'),
('가수3', '가수3_앨범2'),
('가수4', '가수4_앨범1'),
('가수4', '가수4_앨범2')]

 

con.execute(select(A.c.NAME, func.count(B.c.NAME)).\
            select_from(A.join(B)).\
            group_by(A.c.NAME)).fetchall()
 
>>>[('가수1', 2), ('가수2', 2), ('가수3', 2), ('가수4', 2)]
 

************************************************************************

join을 계속 실행하다가 AmbiguousForeignKeysError가 나왔다. 

이때, onclause를설정해주면 된다고 해서 실행해본다.

 

조건 지정 후

# 'T_B'와 'T_D' 테이블 간의 조인 조건 명시적으로 지정
join_condition_B = A.c.NAME == B.c.NAME
join_condition_D = B.c.NAME == D.c.NAME

join 실행

# 가수-앨범-노래
print(select(A.c.NAME, B.c.NAME, D.c.NAME).\
      select_from(A.join(B, onclause = join_condition_B).join(D, onclause=join_condition_D)))
# 위에것은 PF=FK REFRENCE 가정하에
# join(T_A, join(T_B, T_D))

>>> 

SELECT "T_A"."NAME", "T_B"."NAME" AS "NAME_1", "T_D"."NAME" AS "NAME_2" FROM "T_A" JOIN "T_B" ON "T_A"."NAME" = "T_B"."NAME" JOIN "T_D" ON "T_B"."NAME" = "T_D"."NAME"

 

 

아래 나오는 식들도 같은 에러가 나오는데, 위와 같은 식으로 바꿔주면 해결 가능할 것 같다.

그러니, 아래 나오는 식들은 결과값 대신 사용하는 방법에 집중해서 코드를 보자.

con.execute(select(A.c.NAME, B.c.NAME, D.c.NAME).\
    select_from(A.join(B.join(D)))).fetchall()

 

con.execute(select(A.c.NAME, B.c.NAME, func.count(D.c.NAME)).\
    select_from(A.join(B.join(D))).\
    group+by(B.c.PK))\
    .fetchall()

 

con.excute(select(A.c.NAME, B.c.NAME, func.count(D.c.NAME)).\
    select_from(A.join(B.join(D))).\
    where (A.c.NAME.like('%1'))  # 숫자 바꿔보기
    group_by(B.c.PK))\
    .fetchall()

ppt - 146

 

INSERT

 

print(insert(A).values([{'NAME':'아무개1'},{'NAME':'아무개2'}]))
 
>>> INSERT INTO "T_A" ("NAME") VALUES (:NAME_m0), (:NAME_m1)
con.execute(insert(A).values([{'NAME':'아무개1'},{'NAME':'아무개2'}]))
 
>>> <sqlalchemy.engine.cursor.CursorResult at 0x12ea5257c40>

 

 

MULTYPLEJOIN
print(select(A.c.NAME, B.c.NAME).select_from(A.join(B).join(D)))

가수 -앨범 -장르 - 노래

#가수 - 앨범 - 장르 - 노래
print(select(A.c.NAME, B.c.NAME,C.c.NAME,D.c.NAME).\
      select_from(A.join(B).join(D).join(C,C.c,PK==D.c.FK2)))

 

con.excute(select(A.c.NAME, B.c.NAME,C.c.NAME,D.c.NAME).\
      select_from(A.join(B).join(D).join(C,C.c,PK==D.c.FK2))).fetchall()