Python
Oracle Cloud Object Storage관련 Python소스 코드
지오준
2021. 3. 3. 23:23
반응형
1. 파일 업로드 : Bucker에 파일 업로드
from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants
import MEBIBYTE
#파일 업로드 콜백 이벤트 생성
def progress_callback(bytes_uploaded):
print("{} additional bytes uploaded".format(bytes_uploaded))
#파일 업로드 기능
def fileupload(filename, content):
try:
print("【파일업로드 시작】:" + str(datetime.datetime.now()))
# Object Storage 설정가져오기
config = oci.config.from_file()
compartment_id = config["tenancy"]
object_storage = oci.object_storage.ObjectStorageClient(config)
# Bucket을 설정한다.
namespace = object_storage.get_namespace().data
bucket_name = "bucket" #Oracle Cloud Object Storage에 등록되어있는 Bucker 이름
object_name = filename
file_ext = os.path.splitext(filename)
if file_ext[1] == ".json": # json파일 내용작성
with open(filename, 'w') as outfile:
outfile.write(content)
elif file_ext[1] == ".txt": # txt파일 내용작성
with open(filename, 'wb') as f:
f.write(content)
elif file_ext[1] == ".html": # html파일 내용작성
with open(filename, 'w') as html:
html.write(content)
# Oci 서버 Bucker에 파일을 업로드한다.
part_size = 2 * MEBIBYTE
upload_manager = UploadManager(object_storage, allow_parallel_uploads=True, parallel_process_count=3)
response = upload_manager.upload_file(namespace, bucket_name, object_name, filename, part_size=part_size, progress_callback=progress_callback)
# 리눅스서버에 업로드된 파일삭제
os.remove(filename)
exceptExceptionas e:
print("【=== 에러내용 ===】:" + str(datetime.datetime.now()))
print("type:" + str(type(e)))
print("args:" + str(e.args))
print("e자신:" + str(e))
print("【=== 에러내용 ===】:" + str(datetime.datetime.now()))
else:
print("【정상적으로 파일업로드가 완료됬습니다.】:" + str(datetime.datetime.now()))
finally:
print("【파일업로드 완료】:" + str(datetime.datetime.now()))
2. 파일 삭제 : Bucker에 업로드 되어 있는 파일 삭제
from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants
#파일삭제기능
def filedelete(filename):
try:
print("【파일삭제 시작】:" + str(datetime.datetime.now()))
# Object Storage 설정가져오기
config = oci.config.from_file()
compartment_id = config["tenancy"]
object_storage = oci.object_storage.ObjectStorageClient(config)
# 파일을 삭제할 Bucket을설정한다.
namespace = object_storage.get_namespace().data
bucket_name = "bucket" #Oracle Cloud Object Storage등록된 Bucker이름
object_name = filename
# Oci 서버 Bucker에 업로드되어 있는 파일 삭제
object_storage.delete_object(namespace, bucket_name, object_name)
except Exceptionas e:
print("【=== 에러내용 ===】:" + str(datetime.datetime.now()))
print("type:" + str(type(e)))
print("args:" + str(e.args))
print("e자신:" + str(e))
else:
print("【정상적으로 파일삭제가 완료됬습니다.】:" + str(datetime.datetime.now()))
finally:
print("【파일삭제 완료】:" + str(datetime.datetime.now()))
3. Bucker 생성 : 새로운 Bucker를 생성
from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants
# 객체 스토리지 작성
def createbucket(bucketname):
try :
print ("[bucket 작성 시작] : "+ str(datetime.datetime.now()))
# Object Storage 설정을 읽습니다.
config = oci.config.from_file()
compartment_id = config["tenancy"]
object_storage = oci.object_storage.ObjectStorageClient(config)
namespace = object_storage.get_namespace().data
#bucket 생성 함수 호출
request = CreateBucketDetails()
request.compartment_id = compartment_id
request.name = bucketname
bucket = object_storage.create_bucket(namespace, request)
except Exceptionas e:
print ("[=== 오류 내용 ===] : "+ str(datetime.datetime.now()))
print ("type : "+ str(type(e)))
print ("args : "+ str(e.args))
print ("e 자신 : "+ str(e))
else:
print ("[정상적으로 bucket 작성이 완료되었습니다] :"+ str(datetime.datetime.now()))
finally:
print ("[bucket 작성 완료] :" + str(datetime.datetime.now()))
4. Bucker 삭제:생성되는 Bucker 제거
from__future__import print_function
import os
import oci
import datetime
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants
# 객체 스토리지(버킷) 삭제 함수
def deletebucket(bucketname):
try :
print("[bucket 삭제 시작] : "+ str(datetime.datetime.now()))
# Object Storage 설정을 읽습니다.
config = oci.config.from_file()
compartment_id = config["tenancy"]
object_storage = oci.object_storage.ObjectStorageClient(config)
# Bucket을 설정한다.
namespace = object_storage.get_namespace().data
# Bucket의 파일리스트가져오기
object_list = object_storage.list_objects(namespace, bucket_name)
# Bucket의 파일삭제
for filelist in object_list.data.objects:
print('Deleting object {}'.format(filelist.name))
object_storage.delete_object(namespace, bucket_name, filelisto.name)
# Oci 서버 Bucker 삭제
object_storage.delete_bucket(namespace, bucket_name)
except Exceptionas e :
print("[=== 오류 내용 ===] :"+ str(datetime.datetime.now()))
print("type :"+ str(type(e)))
print("args :"+ str(e.args))
print("e 자신 :"+ str(e))
else:
print("[정상적으로 bucket 제거가 완료되었습니다] :"+ str(datetime.datetime.now()))
finally:
print("[bucket 제거 완료] :"+ str(datetime.datetime.now()))
5. 파일리스트 가져오기:Bucket에 등록된 파일리스트 가져오기
from __future__ import print_function
import os
import oci
import datetime
import json
from collections import OrderedDict
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants import MEBIBYTE
# 파일리스트 가져오기 함수
def filelist():
try:
print("【파일리스트 가져오기 시작】:" + str(datetime.datetime.now()))
# Object Storage 설정을 읽습니다.
config = oci.config.from_file()
compartment_id = config["tenancy"]
object_storage = oci.object_storage.ObjectStorageClient(config)
# Bucket을 설정한다.
namespace = object_storage.get_namespace().data
bucket_name = "LotoApi_bucket"
# Bucket에 등록된 파일리스트 가져오기
object_list = object_storage.list_objects(namespace, bucket_name)
file_data = OrderedDict()
file_json = []
# Bucket에 등록된 파일리스트를 Json형태로 변환
for filename in object_list.data.objects:
file_json.append({"filename" : str(filename.name)})
file_data["filelist"] = file_json
return file_data
except Exception as e:
errordate = str(datetime.datetime.now())
print("【=== 오류 내용 ===】:" + errordate)
print("type:" + str(type(e)))
print("args:" + str(e.args))
print("e자신:" + str(e))
else:
print("【정상적으로 파일리스트 가져오기가 완료됬습니다.】:" + str(datetime.datetime.now()))
finally:
print("【파일리스트 가져오기 완료】:" + str(datetime.datetime.now()))
6. 파일다운로드:Bucket에 등록된 파일다운로드
import os
import oci
import datetime
from collections import OrderedDict
from oci.object_storage import UploadManager
from oci.object_storage.models import CreateBucketDetails
from oci.object_storage.transfer.constants import MEBIBYTE
# 파일다운로드 함수
def filedownload(filename,downloadpath):
try:
print("【파일다운로드 시작】:" + str(datetime.datetime.now()))
# Object Storage 설정가져오기
config = oci.config.from_file()
compartment_id = config["tenancy"]
object_storage = oci.object_storage.ObjectStorageClient(config)
# Bucket를 설정
namespace = object_storage.get_namespace().data
bucket_name = "LotoApi_bucket"
object_name = filename
# Oci서버의Bucker에 존재하는 파일명과 다운로드폴더를 설정해서 파일을 다운로드
destination_dir = downloadpath.format(object_name)
get_obj = object_storage.get_object(namespace, bucket_name, object_name)
with open(os.path.join(destination_dir,object_name), 'wb') as downloadfile:
for chunk in get_obj.data.raw.stream(1024 * 1024, decode_content=False):
downloadfile.write(chunk)
except Exception as e:
errordate = str(datetime.datetime.now())
print("【=== 에러 내용 ===】:" + errordate)
print("type:" + str(type(e)))
print("args:" + str(e.args))
print("e자신:" + str(e))
else:
print("【정상적으로 파일다운로드가 완료됬습니다.】:" + str(datetime.datetime.now()))
finally:
print("【파일다운로드 완료】:" + str(datetime.datetime.now()))
반응형