[나의 로직]
- 모든 코인을 검색한다.
- 각 코인의 이전 한시간을 기준으로 돌파계수k를 구한다.
- 매수조건을 통과한 코인 중 어떤 코인을 매수할지 결정한다. 거래량이 많은 것으로 구매한다.
- 만약 돌파점을 통과한다면 매수 -> 2% 이익시 매도
- 만약 -0.1%가 된다면 바로 매도
- 만약 +2%와 -1% 사이에서 10동안 머문다면 매도후 1로 이동
초기에는 -1%시 매도를 실시하였고 1시간동안 제자리라면 매도를 하였지만 백테스트 결과 구매직후 상승을 하지 않으면 하락하는 경우가 많다고 판단하여 10분으로 단축하였고 약간이라도 하락하면 바로 매도하도록 구현하였다.
백테스트 결과 양호한 결과를 보였지만 실제 매도에서는 원하는 가격에 바로 매도가 불가능하기에 큰 이득이 나지는 않았다.
[매수조건]
돌파계수 k는 이전 한시간의 고점과 저점 차의 몇배 상승시 매수를 할지 결정한다
예를 들어 k를 0.6으로 설정하면 이전 고점과저점사이의 간격의 0.6만큼의 금액이 상승하면 상승곡선이라 판단하고 매수를 하는 것이다.
이러한 조건을 충족하는 코인 중 가장 크게 구매율이 상승한 코인을 매수한다.
[매도조건]
나의 매도 조건은 3가지이다.
1. 2%상승
2. 0.1%하락
3. 범위내에서 10분이상 머무름
이 조건을 충족하면 매도를 진행한다.
[백테스트 결과]
2024-03-14와 2024-03-29 데이터를 메인으로 테스트를 하였다.
k=0.6일때가 가장 좋은 결과값을 보였고
한시간동안 머무를때보다 10분이라도 머무르면 매도를 하는 쪽이 결과가 좋았다.
이 데이터들을 바탕으로 수정하였고 로직을 완성하였다.
백테스트시에는 분봉을 기준으로 테스트가 가능하다 그래서 실시간으로 매도가 불가능하기에 2%상승시 매도를 요청하였지만 1분사이에 20퍼이상씩 상승하는 경우가 있다. 이런 경우 때문에 정확한 수익률 계산이 어려웠다.
추세만 확인하는 정도로 사용하였다.
3/14일에는 10퍼센트가량의 성적을 보였다. 하지만 위에서 말한 부분을 고려하면 보수적으로 5퍼센트의 수익이 팔생한다고 볼수있다.
3/29일에는 중간에 폭등한 코인이 있어 200퍼센트의 성적을 보였다. 이부분도 위에서 말한 부분을 고려하면 수익률이 확 낮아지지만 그래도 수익률이 양수 인 것에 의미가 있다고 생각한다.
당분간 이 로직으로 코인거래를 돌리면서 추세를 보려고 한다.
추가적으로 다양한 조건으로 백테스트를 하면서 최적의 값을 찾아보려한다.
아래 코드는 실제 거래가 일어나는 트레이드 코드이다.
trade.py
import time
import pyupbit
import datetime
access = "wbyuJ8ZziemhGZrHHP2fry76hXPDtzjp9ONQWLpJ"
secret = "XdTZixRSmdsbmUERV85cMambFBYo3flmmYaByPvc"
def get_target_price(ticker, k):
"""변동성 돌파 전략으로 매수 목표가 조회"""
"""거래량 변화율 구하기"""
# 1시간전 기록을 기준으로 타켓금액 설정
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=2)
target_price = df.iloc[0]['close'] + (df.iloc[0]['high'] - df.iloc[0]['low']) * k
if(df.iloc[0]['volume']==0):
dv = 0
else:
dv = (df.iloc[1]['volume'] - df.iloc[0]['volume']) / df.iloc[0]['volume']
return target_price, dv
def get_balance(ticker):
"""잔고 조회"""
balances = upbit.get_balances()
for b in balances:
if b['currency'] == ticker:
if b['balance'] is not None:
return float(b['balance'])
else:
return 0
return 0
def get_current_price(ticker):
"""현재가 조회"""
return pyupbit.get_orderbook(ticker=ticker)["orderbook_units"][0]["ask_price"]
def buy(coin):
krw = get_balance("KRW")
if krw > 5000:
print("-------------------buy---------------------")
order_result = upbit.buy_market_order(coin, krw * (1 - fee))
# order_result에서 주문 UUID 추출
order_uuid = order_result['uuid']
time.sleep(1) # API 요청 간에 약간의 지연을 추가
# 주문 정보 조회
order_info = upbit.get_order(order_uuid)
# 체결된 평균 가격을 반환
return order_info['trades'][0]['price'] if order_info['trades'] else None
else:
print("구매 실패: 자금 부족")
return None
def sell(coin):
coin_held = get_balance(coin.split("-")[1])
if coin_held > 0.00008:
order_result = upbit.sell_market_order(coin, coin_held)
print("-------------------sell---------------------")
if 'uuid' in order_result:
order_uuid = order_result['uuid']
time.sleep(1) # API 요청 사이에 약간의 지연을 추가
# 주문 정보 조회
order_info = upbit.get_order(order_uuid)
# 체결된 평균 가격을 반환
if order_info['trades']:
executed_price = float(order_info['trades'][0]['price'])
print("실제 판매 가격:", executed_price)
return executed_price
else:
print("판매 주문이 체결되지 않았습니다.")
return None
else:
print("판매 주문 생성에 실패했습니다.")
return None
else:
print("보유한 코인이 충분하지 않습니다.")
return None
""" 변수 세팅 """
# 거래 수수료
fee = 0.0005
# 타겟 금액 설정 이전기록의 (고점-저점)*k 만큼의 상승이 일어나면 매수
k = 0.6
# 익절 계수: 0.02 -> 2프로 이익시 매도
plus_set = 0.02
# 손절 계수: 0.01 -> 1프로 손해시 매도
minus_set = 0.001
""" 변수 세팅 """
# 초기 상태 설정
is_buy = False
buy_coin = None
buy_price = 0
buy_time = None
coins = pyupbit.get_tickers(fiat="KRW")
profit=1
# 로그인
upbit = pyupbit.Upbit(access, secret)
# 자동매매 시작
print("autotrade start")
while True:
try:
if not is_buy:
max_volume_increase =0
target_coin= None
target_price=0
# 모든 코인 반복물 돌면서 타켓가격을 넘긴 코인중 거래변화량이 가장 큰 코인을 선택
for coin in coins:
# 해당 시간 기준 한시간 전부터 지금까지의 고점,저점을 찾기 -> 돌파기준범위 계산(고점-저가) *k값
# 거래량 증가율 계산 # 거래량 증가률 = (현재거래량-이전분의 거래량)/이전분의 거래량
price, dv = get_target_price(coin, k)
# 현재가격 가져오기
current_price = get_current_price(coin)
# 매수조건을 만족하는 코인중 거래 변화량이 가장 큰 코인 찾기
if price < current_price:
if dv>max_volume_increase:
max_volume_increase = dv
target_coin = coin
target_price = price
if target_coin:
executed_price = buy(target_coin)
if executed_price:
is_buy = True
buy_coin = target_coin
buy_price = float(executed_price)*(1+fee) # 체결 가격을 실수형으로 변환하여 저장
buy_time = datetime.datetime.now()
print("구입 코인: ", buy_coin)
print("구매 가격: ", buy_price )
print("목표 매도가: ", buy_price * (1 + plus_set))
print("목표 손절가: ", buy_price*(1-minus_set))
print("구입 시간: ", buy_time)
print("강제 판매 예정 시간: ", buy_time+datetime.timedelta(minutes=5))
print()
else:
current_price = get_current_price(buy_coin)
current_time = datetime.datetime.now()
# 매도 조건 확인
# 현재 금액이 구입가의 102%이상이면 매도
# 만약 buytime이후 한시간이 지났다면 매도
# 1% 하락하면 매도
if((current_price >= buy_price * (1+plus_set)) or (current_time >= buy_time + datetime.timedelta(minutes=5)) or (current_price <= buy_price*(1-minus_set))):
executed_price = sell(buy_coin)*(1-fee)
if( executed_price ):
print("판매 코인: ", buy_coin)
print("구매 가격: ", buy_price)
print("판매 가격: ", executed_price)
print("목표 매도가: ", buy_price * (1 + plus_set))
print("목표 손절가: ", buy_price*(1-minus_set))
print("판매 시간: ", datetime.datetime.now())
print("강제 판매 예정 시간: ", buy_time+datetime.timedelta(minutes=5))
print("수익률: ", (executed_price-buy_price)/buy_price*100,"%")
profit *= executed_price/buy_price
print("누적 수익률: ", (profit-1)*100,"%")
print()
is_buy = False
buy_coin = None
time.sleep(1)
except Exception as e:
print(e)
# 에러 발생시 수동으로 전량 판매를 해주어야 함
is_buy = False
buy_coin = None
buy_price = 0
buy_time = None
time.sleep(1)
아래 코드는 날짜별 거래 데이터를 수집하는 코드이다. (백테스트 준비과정)
data.py
import time
import pyupbit
import pandas as pd
# 코인 목록 가져오기
coins = pyupbit.get_tickers(fiat="KRW")
# 분봉 데이터를 저장할 빈 DataFrame 초기화
df_minute = pd.DataFrame()
# 각 코인의 분봉 데이터를 가져와서 합치기
for coin in coins:
# OHLCV(open, high, low , close, volume) 당일 시가, 고가, 저가, 종가, 거래량에 대한 데이터
# interval="minute1" 은 분봉, to는 지정 시각까지의 데이터
tempDf = pyupbit.get_ohlcv(coin, interval="minute1", to="2024-03-28", count= 1440)
# API 요청 제한: 분당 600회, 초당 10회 : 자동으로 0.1초 간격으로 호출하지만 코인이 바뀔 때는 수동으로 쉬는시간 줘야됨
time.sleep(0.1)
# tempDf가 None이 아닐 때만 처리
if tempDf is not None:
# 코인 이름 컬럼 추가
tempDf['Coin'] = coin
# 첫 번째 데이터프레임이면 바로 할당, 그렇지 않으면 합치기
if df_minute.empty:
df_minute = tempDf
else:
df_minute = pd.concat([df_minute, tempDf])
else: print("api erorr: 데이터 없음")
# 시간 순으로 정렬
df_minute = df_minute.sort_index()
# 인덱스 리셋 후 'Coin' 컬럼을 두 번째 위치로 이동
df_minute = df_minute.reset_index()
cols = df_minute.columns.tolist()
# 'Coin' 컬럼을 추출하고 나머지 컬럼 순서를 조정
cols = cols[:1] + [cols[-1]] + cols[1:-1]
df_minute = df_minute[cols]
# 결과 출력
print(df_minute)
# 엑셀로 저장
df_minute.to_excel("2024-03-27.xlsx")
아래는 백테스트를 진행하는 코드이다.
backtest.py
# 1. 모든 코인을 검색한다.(o)
# 2. 각 코인의 이전 한시간을 기준으로 k를 구한다.(보류)
# 3. 어떤 코인을 매수할지 결정한다. 거래량이 많은 것으로 구매한다.
# 4. 만약 돌파점을 통과한다면 매수 -> 2% 이익시 매도
# 5. 만약 -1%가 된다면 바로 매도
# 6. 만약 +2%와 -1% 사이에서 5분동안 머문다면 매도후 1로 이동
## 상승장이라면 K 낮게, 하락장은 높게, 배치프로그램 돌려? 또는 수익률이 일정 시간 이상 상승장이면 K를 조금씩 낮춰, 하락이면 k 높여
import pandas as pd
""" 변수 세팅 """
# 테스트 날짜 양식: 2024-02-01 , 날짜.xlsx 파일이 있어야함
test_date = "2024-03-14"
# 거래 수수료
fee = 0.003
# 타겟 금액 설정 이전기록의 (고점-저점)*k 만큼의 상승이 일어나면 매수
k = 0.6
# 타켓 범위 설정 시간 설정 : 1 -> 현재시각과 1시간전 사이의 고점과 저점을 기준으로 계산 , 2면 2시간전
hour_set = 1
# 익절 계수: 0.02 -> 2프로 이익시 매도
plus_set = 0.02
# 손절 계수: 0.01 -> 1프로 손해시 매도
minus_set = 0.001
""" 세팅 """
def calculate_target_range(data, current_time, coin, k, hour_set):
"""
주어진 시간 범위에 대해 고점과 저점을 찾아 k를 곱한 값을 반환합니다.
:param data: 데이터프레임
:param current_time: 현재 시간 인덱스
:param k: 변동성 돌파 계수
:return: 돌파 기준 범위
"""
one_hour_ago = current_time - pd.Timedelta(hours=hour_set)
filtered_data = data[(data['index'] >= one_hour_ago) & (data['index'] <= current_time) & (data['Coin'] == coin)]
high_price = filtered_data['high'].max()
low_price = filtered_data['low'].min()
return (high_price - low_price) * k
# 초기 상태 설정
is_buy = False
buy_coin = None
buy_price = 0
buy_time = None
# 결과를 저장할 데이터프레임
result = []
# 시간순 분봉 데이터 가져오기
df = pd.read_excel(test_date+".xlsx")
# 시간별 데이터 순회
for current_time, group in df.groupby('index'):
## 그룹의 코인수가 50개 이하면 넘어가도록 변형해도 될듯
if not is_buy:
max_volume_increase = 0
target_coin = None
target_price = 0
for _, row in group.iterrows():
# 고점과 저점 차이 계산
# 해당 시간 기준 한시간 전부터 지금까지의 고점,저점을 찾기 -> 돌파기준범위 계산(고점-저가) *k값
target_range = calculate_target_range(df, current_time, row['Coin'], k, hour_set)
# 만약 지금 시간에 거래 조건 충족되면 장바구니에 (코인명, 거래량증가률)넣기
if row['close'] > row['open'] + target_range:
# 거래량 증가율 계산 # 거래량 증가률 = (현재거래량-이전분의 거래량)/이전분의 거래량
volume_increase = (row['volume'] - group['volume'].shift(1).loc[row.name]) / group['volume'].shift(1).loc[row.name]
# 장바구니에서 거래증가률이 제일 많은 코인 구매
if volume_increase > max_volume_increase:
max_volume_increase = volume_increase
target_coin = row['Coin']
target_price = row['open']
buy_time = current_time
# 매수조건을 만족하는 코인이 있다면
if target_coin:
is_buy = True
buy_coin = target_coin
buy_price = target_price
result.append({'time': current_time, 'action': 'buy', 'coin': buy_coin, 'ror(수익률)': 1})
# 장바구니에 아무것도 없다면
else:
result.append({'time': current_time, 'action': 'money_hold', 'coin': None, 'ror(수익률)': 1})
else:
# 매도 조건 확인
# 현재 금액이 구입가의 102%이상이면 매도
# 만약 buytime이후 5분이 지났다면 매도
# 1% 하락하면 매도
try:
row = group[group['Coin'] == buy_coin].iloc[0]
if row['open'] >= buy_price * (1+plus_set) or current_time >= buy_time + pd.Timedelta(minutes=5) or row[
'open'] <= buy_price * (1-minus_set):
is_buy = False
result.append({'time': current_time, 'action': 'sell', 'coin': buy_coin,
'ror(수익률)': 1 + (row['open'] - buy_price) / buy_price - fee})
buy_coin = None
else:
result.append({'time': current_time, 'action': 'coin_hold', 'coin': buy_coin,
'ror(수익률)': 1 })
except:
print("해당 시간에 코인 정보가 없습니다.")
# 결과 데이터프레임 생성 및 엑셀로 저장
result_df = pd.DataFrame(result)
# # 누적 곱 계산(cumprod) => 누적 수익률
result_df['hpr(누적수익률)'] = result_df['ror(수익률)'].cumprod()
file_name = f"{test_date}_backtest_result_k={k}_hour_set={hour_set}_plus_set={plus_set}_minus_set={minus_set}.xlsx"
result_df.to_excel(file_name)
https://github.com/yunzae/coin_trade
GitHub - yunzae/coin_trade
Contribute to yunzae/coin_trade development by creating an account on GitHub.
github.com
'Project > 암호화폐자동매매' 카테고리의 다른 글
서버 실행시 명령어 (0) | 2024.04.01 |
---|
[나의 로직]
- 모든 코인을 검색한다.
- 각 코인의 이전 한시간을 기준으로 돌파계수k를 구한다.
- 매수조건을 통과한 코인 중 어떤 코인을 매수할지 결정한다. 거래량이 많은 것으로 구매한다.
- 만약 돌파점을 통과한다면 매수 -> 2% 이익시 매도
- 만약 -0.1%가 된다면 바로 매도
- 만약 +2%와 -1% 사이에서 10동안 머문다면 매도후 1로 이동
초기에는 -1%시 매도를 실시하였고 1시간동안 제자리라면 매도를 하였지만 백테스트 결과 구매직후 상승을 하지 않으면 하락하는 경우가 많다고 판단하여 10분으로 단축하였고 약간이라도 하락하면 바로 매도하도록 구현하였다.
백테스트 결과 양호한 결과를 보였지만 실제 매도에서는 원하는 가격에 바로 매도가 불가능하기에 큰 이득이 나지는 않았다.
[매수조건]
돌파계수 k는 이전 한시간의 고점과 저점 차의 몇배 상승시 매수를 할지 결정한다
예를 들어 k를 0.6으로 설정하면 이전 고점과저점사이의 간격의 0.6만큼의 금액이 상승하면 상승곡선이라 판단하고 매수를 하는 것이다.
이러한 조건을 충족하는 코인 중 가장 크게 구매율이 상승한 코인을 매수한다.
[매도조건]
나의 매도 조건은 3가지이다.
1. 2%상승
2. 0.1%하락
3. 범위내에서 10분이상 머무름
이 조건을 충족하면 매도를 진행한다.
[백테스트 결과]
2024-03-14와 2024-03-29 데이터를 메인으로 테스트를 하였다.
k=0.6일때가 가장 좋은 결과값을 보였고
한시간동안 머무를때보다 10분이라도 머무르면 매도를 하는 쪽이 결과가 좋았다.
이 데이터들을 바탕으로 수정하였고 로직을 완성하였다.
백테스트시에는 분봉을 기준으로 테스트가 가능하다 그래서 실시간으로 매도가 불가능하기에 2%상승시 매도를 요청하였지만 1분사이에 20퍼이상씩 상승하는 경우가 있다. 이런 경우 때문에 정확한 수익률 계산이 어려웠다.
추세만 확인하는 정도로 사용하였다.
3/14일에는 10퍼센트가량의 성적을 보였다. 하지만 위에서 말한 부분을 고려하면 보수적으로 5퍼센트의 수익이 팔생한다고 볼수있다.
3/29일에는 중간에 폭등한 코인이 있어 200퍼센트의 성적을 보였다. 이부분도 위에서 말한 부분을 고려하면 수익률이 확 낮아지지만 그래도 수익률이 양수 인 것에 의미가 있다고 생각한다.
당분간 이 로직으로 코인거래를 돌리면서 추세를 보려고 한다.
추가적으로 다양한 조건으로 백테스트를 하면서 최적의 값을 찾아보려한다.
아래 코드는 실제 거래가 일어나는 트레이드 코드이다.
trade.py
import time
import pyupbit
import datetime
access = "wbyuJ8ZziemhGZrHHP2fry76hXPDtzjp9ONQWLpJ"
secret = "XdTZixRSmdsbmUERV85cMambFBYo3flmmYaByPvc"
def get_target_price(ticker, k):
"""변동성 돌파 전략으로 매수 목표가 조회"""
"""거래량 변화율 구하기"""
# 1시간전 기록을 기준으로 타켓금액 설정
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=2)
target_price = df.iloc[0]['close'] + (df.iloc[0]['high'] - df.iloc[0]['low']) * k
if(df.iloc[0]['volume']==0):
dv = 0
else:
dv = (df.iloc[1]['volume'] - df.iloc[0]['volume']) / df.iloc[0]['volume']
return target_price, dv
def get_balance(ticker):
"""잔고 조회"""
balances = upbit.get_balances()
for b in balances:
if b['currency'] == ticker:
if b['balance'] is not None:
return float(b['balance'])
else:
return 0
return 0
def get_current_price(ticker):
"""현재가 조회"""
return pyupbit.get_orderbook(ticker=ticker)["orderbook_units"][0]["ask_price"]
def buy(coin):
krw = get_balance("KRW")
if krw > 5000:
print("-------------------buy---------------------")
order_result = upbit.buy_market_order(coin, krw * (1 - fee))
# order_result에서 주문 UUID 추출
order_uuid = order_result['uuid']
time.sleep(1) # API 요청 간에 약간의 지연을 추가
# 주문 정보 조회
order_info = upbit.get_order(order_uuid)
# 체결된 평균 가격을 반환
return order_info['trades'][0]['price'] if order_info['trades'] else None
else:
print("구매 실패: 자금 부족")
return None
def sell(coin):
coin_held = get_balance(coin.split("-")[1])
if coin_held > 0.00008:
order_result = upbit.sell_market_order(coin, coin_held)
print("-------------------sell---------------------")
if 'uuid' in order_result:
order_uuid = order_result['uuid']
time.sleep(1) # API 요청 사이에 약간의 지연을 추가
# 주문 정보 조회
order_info = upbit.get_order(order_uuid)
# 체결된 평균 가격을 반환
if order_info['trades']:
executed_price = float(order_info['trades'][0]['price'])
print("실제 판매 가격:", executed_price)
return executed_price
else:
print("판매 주문이 체결되지 않았습니다.")
return None
else:
print("판매 주문 생성에 실패했습니다.")
return None
else:
print("보유한 코인이 충분하지 않습니다.")
return None
""" 변수 세팅 """
# 거래 수수료
fee = 0.0005
# 타겟 금액 설정 이전기록의 (고점-저점)*k 만큼의 상승이 일어나면 매수
k = 0.6
# 익절 계수: 0.02 -> 2프로 이익시 매도
plus_set = 0.02
# 손절 계수: 0.01 -> 1프로 손해시 매도
minus_set = 0.001
""" 변수 세팅 """
# 초기 상태 설정
is_buy = False
buy_coin = None
buy_price = 0
buy_time = None
coins = pyupbit.get_tickers(fiat="KRW")
profit=1
# 로그인
upbit = pyupbit.Upbit(access, secret)
# 자동매매 시작
print("autotrade start")
while True:
try:
if not is_buy:
max_volume_increase =0
target_coin= None
target_price=0
# 모든 코인 반복물 돌면서 타켓가격을 넘긴 코인중 거래변화량이 가장 큰 코인을 선택
for coin in coins:
# 해당 시간 기준 한시간 전부터 지금까지의 고점,저점을 찾기 -> 돌파기준범위 계산(고점-저가) *k값
# 거래량 증가율 계산 # 거래량 증가률 = (현재거래량-이전분의 거래량)/이전분의 거래량
price, dv = get_target_price(coin, k)
# 현재가격 가져오기
current_price = get_current_price(coin)
# 매수조건을 만족하는 코인중 거래 변화량이 가장 큰 코인 찾기
if price < current_price:
if dv>max_volume_increase:
max_volume_increase = dv
target_coin = coin
target_price = price
if target_coin:
executed_price = buy(target_coin)
if executed_price:
is_buy = True
buy_coin = target_coin
buy_price = float(executed_price)*(1+fee) # 체결 가격을 실수형으로 변환하여 저장
buy_time = datetime.datetime.now()
print("구입 코인: ", buy_coin)
print("구매 가격: ", buy_price )
print("목표 매도가: ", buy_price * (1 + plus_set))
print("목표 손절가: ", buy_price*(1-minus_set))
print("구입 시간: ", buy_time)
print("강제 판매 예정 시간: ", buy_time+datetime.timedelta(minutes=5))
print()
else:
current_price = get_current_price(buy_coin)
current_time = datetime.datetime.now()
# 매도 조건 확인
# 현재 금액이 구입가의 102%이상이면 매도
# 만약 buytime이후 한시간이 지났다면 매도
# 1% 하락하면 매도
if((current_price >= buy_price * (1+plus_set)) or (current_time >= buy_time + datetime.timedelta(minutes=5)) or (current_price <= buy_price*(1-minus_set))):
executed_price = sell(buy_coin)*(1-fee)
if( executed_price ):
print("판매 코인: ", buy_coin)
print("구매 가격: ", buy_price)
print("판매 가격: ", executed_price)
print("목표 매도가: ", buy_price * (1 + plus_set))
print("목표 손절가: ", buy_price*(1-minus_set))
print("판매 시간: ", datetime.datetime.now())
print("강제 판매 예정 시간: ", buy_time+datetime.timedelta(minutes=5))
print("수익률: ", (executed_price-buy_price)/buy_price*100,"%")
profit *= executed_price/buy_price
print("누적 수익률: ", (profit-1)*100,"%")
print()
is_buy = False
buy_coin = None
time.sleep(1)
except Exception as e:
print(e)
# 에러 발생시 수동으로 전량 판매를 해주어야 함
is_buy = False
buy_coin = None
buy_price = 0
buy_time = None
time.sleep(1)
아래 코드는 날짜별 거래 데이터를 수집하는 코드이다. (백테스트 준비과정)
data.py
import time
import pyupbit
import pandas as pd
# 코인 목록 가져오기
coins = pyupbit.get_tickers(fiat="KRW")
# 분봉 데이터를 저장할 빈 DataFrame 초기화
df_minute = pd.DataFrame()
# 각 코인의 분봉 데이터를 가져와서 합치기
for coin in coins:
# OHLCV(open, high, low , close, volume) 당일 시가, 고가, 저가, 종가, 거래량에 대한 데이터
# interval="minute1" 은 분봉, to는 지정 시각까지의 데이터
tempDf = pyupbit.get_ohlcv(coin, interval="minute1", to="2024-03-28", count= 1440)
# API 요청 제한: 분당 600회, 초당 10회 : 자동으로 0.1초 간격으로 호출하지만 코인이 바뀔 때는 수동으로 쉬는시간 줘야됨
time.sleep(0.1)
# tempDf가 None이 아닐 때만 처리
if tempDf is not None:
# 코인 이름 컬럼 추가
tempDf['Coin'] = coin
# 첫 번째 데이터프레임이면 바로 할당, 그렇지 않으면 합치기
if df_minute.empty:
df_minute = tempDf
else:
df_minute = pd.concat([df_minute, tempDf])
else: print("api erorr: 데이터 없음")
# 시간 순으로 정렬
df_minute = df_minute.sort_index()
# 인덱스 리셋 후 'Coin' 컬럼을 두 번째 위치로 이동
df_minute = df_minute.reset_index()
cols = df_minute.columns.tolist()
# 'Coin' 컬럼을 추출하고 나머지 컬럼 순서를 조정
cols = cols[:1] + [cols[-1]] + cols[1:-1]
df_minute = df_minute[cols]
# 결과 출력
print(df_minute)
# 엑셀로 저장
df_minute.to_excel("2024-03-27.xlsx")
아래는 백테스트를 진행하는 코드이다.
backtest.py
# 1. 모든 코인을 검색한다.(o)
# 2. 각 코인의 이전 한시간을 기준으로 k를 구한다.(보류)
# 3. 어떤 코인을 매수할지 결정한다. 거래량이 많은 것으로 구매한다.
# 4. 만약 돌파점을 통과한다면 매수 -> 2% 이익시 매도
# 5. 만약 -1%가 된다면 바로 매도
# 6. 만약 +2%와 -1% 사이에서 5분동안 머문다면 매도후 1로 이동
## 상승장이라면 K 낮게, 하락장은 높게, 배치프로그램 돌려? 또는 수익률이 일정 시간 이상 상승장이면 K를 조금씩 낮춰, 하락이면 k 높여
import pandas as pd
""" 변수 세팅 """
# 테스트 날짜 양식: 2024-02-01 , 날짜.xlsx 파일이 있어야함
test_date = "2024-03-14"
# 거래 수수료
fee = 0.003
# 타겟 금액 설정 이전기록의 (고점-저점)*k 만큼의 상승이 일어나면 매수
k = 0.6
# 타켓 범위 설정 시간 설정 : 1 -> 현재시각과 1시간전 사이의 고점과 저점을 기준으로 계산 , 2면 2시간전
hour_set = 1
# 익절 계수: 0.02 -> 2프로 이익시 매도
plus_set = 0.02
# 손절 계수: 0.01 -> 1프로 손해시 매도
minus_set = 0.001
""" 세팅 """
def calculate_target_range(data, current_time, coin, k, hour_set):
"""
주어진 시간 범위에 대해 고점과 저점을 찾아 k를 곱한 값을 반환합니다.
:param data: 데이터프레임
:param current_time: 현재 시간 인덱스
:param k: 변동성 돌파 계수
:return: 돌파 기준 범위
"""
one_hour_ago = current_time - pd.Timedelta(hours=hour_set)
filtered_data = data[(data['index'] >= one_hour_ago) & (data['index'] <= current_time) & (data['Coin'] == coin)]
high_price = filtered_data['high'].max()
low_price = filtered_data['low'].min()
return (high_price - low_price) * k
# 초기 상태 설정
is_buy = False
buy_coin = None
buy_price = 0
buy_time = None
# 결과를 저장할 데이터프레임
result = []
# 시간순 분봉 데이터 가져오기
df = pd.read_excel(test_date+".xlsx")
# 시간별 데이터 순회
for current_time, group in df.groupby('index'):
## 그룹의 코인수가 50개 이하면 넘어가도록 변형해도 될듯
if not is_buy:
max_volume_increase = 0
target_coin = None
target_price = 0
for _, row in group.iterrows():
# 고점과 저점 차이 계산
# 해당 시간 기준 한시간 전부터 지금까지의 고점,저점을 찾기 -> 돌파기준범위 계산(고점-저가) *k값
target_range = calculate_target_range(df, current_time, row['Coin'], k, hour_set)
# 만약 지금 시간에 거래 조건 충족되면 장바구니에 (코인명, 거래량증가률)넣기
if row['close'] > row['open'] + target_range:
# 거래량 증가율 계산 # 거래량 증가률 = (현재거래량-이전분의 거래량)/이전분의 거래량
volume_increase = (row['volume'] - group['volume'].shift(1).loc[row.name]) / group['volume'].shift(1).loc[row.name]
# 장바구니에서 거래증가률이 제일 많은 코인 구매
if volume_increase > max_volume_increase:
max_volume_increase = volume_increase
target_coin = row['Coin']
target_price = row['open']
buy_time = current_time
# 매수조건을 만족하는 코인이 있다면
if target_coin:
is_buy = True
buy_coin = target_coin
buy_price = target_price
result.append({'time': current_time, 'action': 'buy', 'coin': buy_coin, 'ror(수익률)': 1})
# 장바구니에 아무것도 없다면
else:
result.append({'time': current_time, 'action': 'money_hold', 'coin': None, 'ror(수익률)': 1})
else:
# 매도 조건 확인
# 현재 금액이 구입가의 102%이상이면 매도
# 만약 buytime이후 5분이 지났다면 매도
# 1% 하락하면 매도
try:
row = group[group['Coin'] == buy_coin].iloc[0]
if row['open'] >= buy_price * (1+plus_set) or current_time >= buy_time + pd.Timedelta(minutes=5) or row[
'open'] <= buy_price * (1-minus_set):
is_buy = False
result.append({'time': current_time, 'action': 'sell', 'coin': buy_coin,
'ror(수익률)': 1 + (row['open'] - buy_price) / buy_price - fee})
buy_coin = None
else:
result.append({'time': current_time, 'action': 'coin_hold', 'coin': buy_coin,
'ror(수익률)': 1 })
except:
print("해당 시간에 코인 정보가 없습니다.")
# 결과 데이터프레임 생성 및 엑셀로 저장
result_df = pd.DataFrame(result)
# # 누적 곱 계산(cumprod) => 누적 수익률
result_df['hpr(누적수익률)'] = result_df['ror(수익률)'].cumprod()
file_name = f"{test_date}_backtest_result_k={k}_hour_set={hour_set}_plus_set={plus_set}_minus_set={minus_set}.xlsx"
result_df.to_excel(file_name)
https://github.com/yunzae/coin_trade
GitHub - yunzae/coin_trade
Contribute to yunzae/coin_trade development by creating an account on GitHub.
github.com
'Project > 암호화폐자동매매' 카테고리의 다른 글
서버 실행시 명령어 (0) | 2024.04.01 |
---|