Python

Oracle Cloud Object Storage관련 Python소스 코드

지오준 2021. 3. 3.
반응형

1. 파일 업로드 : Bucker에 파일 업로드

from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants 
import MEBIBYTE

#파일 업로드 콜백 이벤트 생성
def progress_callback(bytes_uploaded):
    print("{} additional bytes uploaded".format(bytes_uploaded))

#파일 업로드 기능
def fileupload(filename, content):
    try:
        print("【파일업로드 시작】:" + str(datetime.datetime.now()))
        # Object Storage 설정가져오기
        config = oci.config.from_file()
        compartment_id = config["tenancy"]
        object_storage = oci.object_storage.ObjectStorageClient(config)

        # Bucket을 설정한다.
        namespace = object_storage.get_namespace().data
        bucket_name = "bucket" #Oracle Cloud Object Storage에 등록되어있는 Bucker 이름
        object_name = filename
        
        file_ext = os.path.splitext(filename)

        if file_ext[1] == ".json":    # json파일 내용작성
           with open(filename, 'w') as outfile:
                outfile.write(content)
        elif file_ext[1] == ".txt":  # txt파일 내용작성
            with open(filename, 'wb') as f:
                f.write(content)
        elif file_ext[1] == ".html":  # html파일 내용작성
            with open(filename, 'w') as html:
                html.write(content)       

        # Oci 서버 Bucker에 파일을 업로드한다.
        part_size = 2 * MEBIBYTE  
        upload_manager = UploadManager(object_storage, allow_parallel_uploads=True, parallel_process_count=3)
        response = upload_manager.upload_file(namespace, bucket_name, object_name, filename, part_size=part_size, progress_callback=progress_callback)
        # 리눅스서버에 업로드된 파일삭제
        os.remove(filename)
    exceptExceptionas e:
        print("【=== 에러내용 ===】:" + str(datetime.datetime.now()))
        print("type:" + str(type(e)))
        print("args:" + str(e.args))
        print("e자신:" + str(e))
        print("【=== 에러내용 ===】:" + str(datetime.datetime.now()))
    else:
        print("【정상적으로 파일업로드가 완료됬습니다.】:" + str(datetime.datetime.now()))
    finally:
        print("【파일업로드 완료】:" + str(datetime.datetime.now()))

2. 파일 삭제 : Bucker에 업로드 되어 있는 파일 삭제

from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants 

#파일삭제기능
def filedelete(filename):
    try:
        print("【파일삭제 시작】:" + str(datetime.datetime.now()))

        # Object Storage 설정가져오기
        config = oci.config.from_file()
        compartment_id = config["tenancy"]
        object_storage = oci.object_storage.ObjectStorageClient(config)

        # 파일을 삭제할 Bucket을설정한다.
        namespace = object_storage.get_namespace().data
        bucket_name = "bucket" #Oracle Cloud Object Storage등록된 Bucker이름
        object_name = filename

        # Oci 서버 Bucker에 업로드되어 있는 파일 삭제
        object_storage.delete_object(namespace, bucket_name, object_name)
    except Exceptionas e:
        print("【=== 에러내용 ===】:" + str(datetime.datetime.now()))
        print("type:" + str(type(e)))
        print("args:" + str(e.args))
        print("e자신:" + str(e))
    else:
        print("【정상적으로 파일삭제가 완료됬습니다.】:" + str(datetime.datetime.now()))
    finally:
        print("【파일삭제 완료】:" + str(datetime.datetime.now()))

3. Bucker 생성 : 새로운 Bucker를 생성

from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants 

# 객체 스토리지 작성
def createbucket(bucketname):
    try :
          print ("[bucket 작성 시작] : "+ str(datetime.datetime.now()))
          
          # Object Storage 설정을 읽습니다.
          config = oci.config.from_file()
          compartment_id = config["tenancy"]
          object_storage = oci.object_storage.ObjectStorageClient(config)
          namespace = object_storage.get_namespace().data

          #bucket 생성 함수 호출
          request = CreateBucketDetails()
          request.compartment_id = compartment_id
          request.name = bucketname
          bucket = object_storage.create_bucket(namespace, request)
     except Exceptionas e:
          print ("[=== 오류 내용 ===] : "+ str(datetime.datetime.now()))
          print ("type : "+ str(type(e)))
          print ("args : "+ str(e.args))
          print ("e 자신 : "+ str(e))
     else:
          print ("[정상적으로 bucket 작성이 완료되었습니다] :"+ str(datetime.datetime.now()))
     finally:
          print ("[bucket 작성 완료] :" + str(datetime.datetime.now()))

4. Bucker 삭제:생성되는 Bucker 제거

from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants 

# 객체 스토리지(버킷) 삭제 함수
def deletebucket(bucketname):
    try :
        print("[bucket 삭제 시작] : "+ str(datetime.datetime.now()))
        
        # Object Storage 설정을 읽습니다.
        config = oci.config.from_file()
        compartment_id = config["tenancy"]
        object_storage = oci.object_storage.ObjectStorageClient(config)

        # Bucket을 설정한다.
        namespace = object_storage.get_namespace().data

        # Bucket의 파일리스트가져오기
        object_list = object_storage.list_objects(namespace, bucket_name)

        # Bucket의 파일삭제
        for filelist in object_list.data.objects:
            print('Deleting object {}'.format(filelist.name))
            object_storage.delete_object(namespace, bucket_name, filelisto.name)

        # Oci 서버 Bucker 삭제
        object_storage.delete_bucket(namespace, bucket_name)
    except Exceptionas e :
        print("[=== 오류 내용 ===] :"+ str(datetime.datetime.now()))
        print("type :"+ str(type(e)))
        print("args :"+ str(e.args))
        print("e 자신 :"+ str(e))
    else:
        print("[정상적으로 bucket 제거가 완료되었습니다] :"+ str(datetime.datetime.now()))
    finally:
        print("[bucket 제거 완료] :"+ str(datetime.datetime.now()))

5. 파일리스트 가져오기:Bucket에 등록된 파일리스트 가져오기

from __future__ import print_function
import os
import oci
import datetime
import json
from collections import OrderedDict
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants import MEBIBYTE

# 파일리스트 가져오기 함수
def filelist():
    try:
        print("【파일리스트 가져오기 시작】:" + str(datetime.datetime.now()))    
   
        # Object Storage 설정을 읽습니다.
        config = oci.config.from_file()
        compartment_id = config["tenancy"]
        object_storage = oci.object_storage.ObjectStorageClient(config)

        # Bucket을 설정한다.
        namespace = object_storage.get_namespace().data
        bucket_name = "LotoApi_bucket"

        # Bucket에 등록된 파일리스트 가져오기
        object_list = object_storage.list_objects(namespace, bucket_name)
        file_data = OrderedDict()
        file_json = []    

        # Bucket에 등록된 파일리스트를 Json형태로 변환
        for filename in object_list.data.objects:
            file_json.append({"filename" : str(filename.name)})

        file_data["filelist"] = file_json

        return file_data
    except Exception as e:
        errordate = str(datetime.datetime.now())

        print("【=== 오류 내용 ===】:" + errordate)
        print("type:" + str(type(e)))
        print("args:" + str(e.args))
        print("e자신:" + str(e))
    else:
        print("【정상적으로 파일리스트 가져오기가 완료됬습니다.】:" + str(datetime.datetime.now()))
    finally:
        print("【파일리스트 가져오기 완료】:" + str(datetime.datetime.now()))

6. 파일다운로드:Bucket에 등록된 파일다운로드

import os
import oci
import datetime
from collections import OrderedDict
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants import MEBIBYTE

# 파일다운로드 함수
def filedownload(filename,downloadpath):
    try:
        print("【파일다운로드 시작】:" + str(datetime.datetime.now()))    
        # Object Storage 설정가져오기
        config = oci.config.from_file()
        compartment_id = config["tenancy"]
        object_storage = oci.object_storage.ObjectStorageClient(config)

        # Bucket를 설정
        namespace = object_storage.get_namespace().data
        bucket_name = "LotoApi_bucket"
        object_name = filename

        # Oci서버의Bucker에 존재하는 파일명과 다운로드폴더를 설정해서 파일을 다운로드
        destination_dir = downloadpath.format(object_name) 
        get_obj = object_storage.get_object(namespace, bucket_name, object_name)
        with open(os.path.join(destination_dir,object_name), 'wb') as downloadfile:
            for chunk in get_obj.data.raw.stream(1024 * 1024, decode_content=False):
                downloadfile.write(chunk)

    except Exception as e:
        errordate = str(datetime.datetime.now())

        print("【=== 에러 내용 ===】:" + errordate)
        print("type:" + str(type(e)))
        print("args:" + str(e.args))
        print("e자신:" + str(e))

    else:
        print("【정상적으로 파일다운로드가 완료됬습니다.】:" + str(datetime.datetime.now()))
    finally:
        print("【파일다운로드 완료】:" + str(datetime.datetime.now()))
반응형

댓글