티스토리 뷰

반응형

[Python] ADB와 다중 계정 앱을 활용한 안드로이드 자동화 스크립트 (handle_device.py)

안녕하세요! 오늘은 **ADB(Android Debug Bridge)**와 Python을 활용해 안드로이드 장치에서 다중 계정 앱을 제어하고, 파일 업로드 및 삭제, 저장 작업을 자동화하는 스크립트를 소개합니다. 이 코드는 handle_device.py라는 이름으로, 이전에 소개한 메인 스크립트와 연동되어 개별 장치 작업을 처리하는 역할을 합니다.

안녕하세요, 오늘은 Python을 활용해 여러 안드로이드 장치를 동시에 관리할 수 있는 스크립트를 소개하려고 합니다. 이 코드는 **ADB(Android Debug Bridge)**를 사용해 연결된 장치의 시리얼 번호를 확인하고, 엑셀 파일에서 데이터를 읽어 특정 작업을 수행하는 기능을 포함하고 있습니다. 특히, 다중 프로세스와 subprocess를 활용해 병렬 처리가 가능하다는 점이 특징입니다.

코드 개요

이 스크립트는 다음과 같은 기능을 수행합니다:

  1. ADB로 연결된 장치 목록 가져오기: adb devices 명령어를 통해 연결된 안드로이드 장치의 시리얼 번호를 추출합니다.
  2. 엑셀 파일 읽기: 특정 엑셀 파일(계정현황.xlsx)에서 데이터를 읽어 장치 시리얼 번호와 매핑합니다.
  3. 작업 분배: start, delete, save, both 등의 작업을 인자로 받아 각 장치에 대해 병렬로 실행합니다.
  4. 외부 스크립트 실행: handle_device.py라는 별도의 파이썬 파일을 호출해 개별 장치 작업을 처리합니다.

코드 전체

아래는 실제 코드입니다. 공개 소스로 사용하기 위해 개인 식별 정보(ID, 비밀번호, 시리얼 번호 등)는 예시 값으로 대체되었습니다.

import subprocess
import logging
import pandas as pd
import os
import re
import time
import random
from collections import defaultdict
import urllib.parse
from datetime import datetime
import sys

delivery_fee = 3000

# 현재 스크립트 파일의 경로를 글로벌 변수로 설정
current_dir = os.path.dirname(os.path.realpath(__file__))  # 현재 스크립트 파일의 경로
phone_picture_path = "/sdcard/Pictures/"

ipmonsterId='2222222222'
ipmonsterPw='111111**'



def parse_item_excel():# /jobs/ 폴더 내의 a.xlsx 경로 생성   
    xls_file = os.path.join(current_dir, 'main\양식', '물건양식엑셀.xlsx')    
    df = pd.read_excel(xls_file, header=None)  
    return df

item_infos = parse_item_excel()
     

class AuthenticationError(Exception):
    """본인인증 또는 지역인증 관련 에러"""
    pass

class UploadLimitError(Exception):
    """글 작성 제한 에러"""
    pass
class ProgramExecutionError(Exception):
    """프로그램 진행 중 발생한 오류"""
    pass

class ClickFailureError(Exception):
    """클릭 실패 오류"""
    pass

class ClickFailureErrorRetry(Exception):
    """클릭 실패 오류 재시도저장"""
    pass

class LoadingTimeoutError(Exception):
    """로딩이 너무 오래 걸리는 오류"""
    pass

class CloneUpload:
    def __init__(self, device_id, cloneapps, floor, work_type):
        self.device_id = str(device_id)
        self.cloneapps = cloneapps 
        self.floor = 0
        self.phone = 0   
        self.work_type = work_type
        self.app = 0  
        self.jpg_dict = []        
        self.retry = 0 
        self.xml_dump = ''
        match = re.match(r'(\d+)Floor(\d+)', floor)
        if match:
            self.floor = int(match.group(1))  
            self.phone = int(match.group(2))            
        self.setup_logging()

    def setup_logging(self):
        """ 오류 로그를 error.txt로 저장 """
        self.logger = logging.getLogger('CloneUpload')
        handler = logging.FileHandler('error.txt')
        handler.setLevel(logging.ERROR)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_message(self, msg):
        """ 현재 날짜/시간을 포함한 로그 파일 생성 후 메시지 저장 """
        log_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log')
        if not os.path.exists(log_folder):
            os.makedirs(log_folder)
        now = datetime.now()
        time_str = now.strftime('%H:%M:%S')
        filename = os.path.join(log_folder, f'log{now.strftime("%Y%m%d")}.txt')
        log_entry = f"{time_str} - Phone: {self.phone} , App: {self.app} , Serial: {self.device_id} : -- {msg}"
        
        with open(filename, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')

    def error_message(self, msg):
        """ 현재 날짜/시간을 포함한 로그 파일 생성 후 메시지 저장 """
        error_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'error')
        if not os.path.exists(error_folder):
            os.makedirs(error_folder)
        now = datetime.now()
        time_str = now.strftime('%H:%M:%S')
        filename = os.path.join(error_folder, f'error{now.strftime("%Y%m%d")}.txt')
        log_entry = f"{time_str} - Phone: {self.phone} , App: {self.app} , Serial: {self.device_id} : -- {msg}"
        with open(filename, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')

    def save_result_message(self, msg):
        """ 현재 날짜/시간을 포함한 로그 파일 생성 후 메시지 저장 """
        save_result_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'result')
        if not os.path.exists(save_result_folder):
            os.makedirs(save_result_folder)
        now = datetime.now()
        time_str = now.strftime('%H:%M:%S')
        filename = os.path.join(save_result_folder, f'save_result{now.strftime("%Y%m%d")}.txt')
        msg_str = "\n".join([f"{phone}번폰의 {app}번앱" for phone, app in msg])
        log_entry = f"{time_str} 작성 완료 실패목록 :\n{msg_str}"
        with open(filename, 'a', encoding='utf-8') as f:
            f.write(log_entry + '\n')            

    def press_home_button(self):        
        subprocess.run(['adb', '-s', self.device_id, 'shell', 'input', 'keyevent', '3'])  # 3은 HOME 버튼 키 이벤트

    def close_background_app(self):
        subprocess.run(['adb', '-s', self.device_id, 'shell', 'input', 'keyevent', '187'])  # 3은 HOME 버튼 키 이벤트
        #com.sec.android.app.launcher:id/clear_all_button
        self.click_text_on_device('com.sec.android.app.launcher:id/clear_all_button')



    def sorted_jpg_dict(self):
        job_path = os.path.join(current_dir,f'main/jobs/{self.floor}F/{self.phone}/clone{self.app}')

        file_names = [f for f in os.listdir(job_path) if f.endswith('.jpg')]        
        file_dict = defaultdict(list)
        pattern = r'(\d+)\s\((\d+)\)\s#\.jpg'
        for file in file_names:
            match = re.match(pattern, file)
            if match:
                base_number = int(match.group(1))  # () 밖의 숫자
                index_number = int(match.group(2))  # () 안의 숫자
                file_dict[base_number].append(index_number)               
        sorted_files = []

        for base_number in sorted(file_dict.keys()):
            sorted_index = sorted(file_dict[base_number])
            sorted_files.append([base_number, sorted_index])

        # 결과 출력
        for entry in sorted_files:
            print(entry)
        return sorted_files

    def get_random_line(self):
        file_path = os.path.join(current_dir, 'main', '택배유도양식.txt')

        with open(file_path, 'r', encoding='utf-8') as file:
            lines = [line.strip() for line in file.readlines() if line.strip()]  # 빈 줄 제거
       
        return random.choice(lines)  # 랜덤한 한 줄 반환
    
    def delete_to_image(self):
        command = ['adb', '-s', self.device_id, 'shell', 'rm', '-rf', '/sdcard/Pictures/*']        
        print(" ".join(command))# 명령어 출력
        subprocess.run(command, check=True)

    def send_to_image(self,type=0):
        job_path = os.path.join(current_dir,f'main/jobs/{self.floor}F/{self.phone}/clone{self.app}')
        #file_path = os.path.join(job_path,f'{self.jpg_dict[0]} ({self.jpg_dict[1][0]}) #.jpg')

        # 첫 번째 인덱스의 파일을 단독으로 처리
        print(self.jpg_dict)
        if type > 0 :
            first_index = self.jpg_dict[1][0]
            file_path = os.path.join(job_path, f'{self.jpg_dict[0]} ({first_index}) #.jpg')
            command = [
                'adb', '-s', self.device_id, 'push', 
                file_path, os.path.join(phone_picture_path, f'{self.jpg_dict[0]} ({first_index}) #.jpg')
            ]            
            print(" ".join(command))# 명령어 출력
            subprocess.run(command, check=True)  # 첫 번째 파일 전송
            # 미디어 스캐너 트리거: 갤러리에서 파일을 새로 고침
            file_path_escaped = urllib.parse.quote(f'/sdcard/Pictures/{self.jpg_dict[0]} ({first_index}) #.jpg')
            media_scan_command = [
                'adb', '-s', self.device_id, 'shell', 
                'am', 'broadcast', 
                '-a', 'android.intent.action.MEDIA_SCANNER_SCAN_FILE', 
                '-d', f'file://{file_path_escaped}'
            ]                        
            print(" ".join(media_scan_command))# 미디어 스캐너 트리거 명령어 출력                       
            subprocess.run(media_scan_command, check=True)            

        else :
            # 나머지 인덱스의 파일들을 반복문으로 처리
            for index in self.jpg_dict[1][1:]:  # 첫 번째 인덱스를 제외한 나머지 항목들
                file_path = os.path.join(job_path, f'{self.jpg_dict[0]} ({index}) #.jpg')
                command = [
                    'adb', '-s', self.device_id, 'push', 
                    file_path, os.path.join(phone_picture_path, f'{self.jpg_dict[0]} ({index}) #.jpg')
                ]
                subprocess.run(command, check=True)  # 각 파일 전송     
                file_path_escaped = urllib.parse.quote(f'/sdcard/Pictures/{self.jpg_dict[0]} ({index}) #.jpg')
                media_scan_command = [
                    'adb', '-s', self.device_id, 'shell', 
                    'am', 'broadcast', 
                    '-a', 'android.intent.action.MEDIA_SCANNER_SCAN_FILE', 
                    '-d', f'file://{file_path_escaped}'
                ]
                                
                print(" ".join(media_scan_command))
                subprocess.run(media_scan_command, check=True)                      
    


    
    def parse_item_info(self):        
        matching_row =  item_infos[ item_infos[0]==self.jpg_dict[0]]
        if not matching_row.empty:
            item_info = {
                'A': matching_row.iloc[0, 0],  # A열 값 순번
                'B': matching_row.iloc[0, 1],  # B열 값 내용
                'C': matching_row.iloc[0, 2],  # C열 값 가격
                'D': matching_row.iloc[0, 3],  # D열 값 사이즈          
            }
            print(f"찾은 데이터: {item_info}")
        else:
            print(f"'{self.jpg_dict[0]}'에 해당하는 데이터를 찾을 수 없습니다.")
        return item_info

    def get_screen_dump(self):
           
        #subprocess.run(['adb', '-s', self.device_id, 'shell', 'uiautomator', 'dump'], check=True)
        result = subprocess.run(
            ['adb', '-s', self.device_id, 'shell', 'uiautomator', 'dump'],
            check=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
    
        if "could not get idle state" in  result.stdout.decode('utf-8') :
            print("ERROR: could not get idle state detected. Handling error...")
              

        #ERROR: could not get idle state.
        command = ['adb', '-s', self.device_id, 'shell', 'cat', '/sdcard/window_dump.xml']
        result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)                  
        xml_content = result.stdout.decode('utf-8') 
        self.xml_dump = xml_content 
        return xml_content

    def is_stuck_loading(self,wait=10, check_interval=1):       
        if wait > 0:
            prev_xml = self.get_screen_dump()  # 처음 XML을 가져옴
            for _ in range(wait):                    
                time.sleep(check_interval)
                current_xml = self.get_screen_dump()         
                if prev_xml != current_xml:
                    return False
                prev_xml = current_xml                  
            print("⚠️ 무한 로딩 감지됨! 앱을 다시 시작합니다.")

            return True
        

    def wait_text_xml(self,text,wait=0):
        xml_content =''  
        for _ in range(wait+1):#
            matches = []
            direction = ''           
            xml_content = self.get_screen_dump()

            if '본인인증' in xml_content or '지역인증' in xml_content or '일시적으로 서비스를 사용할 수 없어요.' in xml_content:       
                self.error_message(f"(본인인증 또는 지역인증 발견)")
                raise AuthenticationError(f"{self.floor}층 {self.phone}번폰 clone{self.app} 인증에러{self.device_id} 인증에러 발생: 본인인증 또는 지역인증 발견")  # 예외 발생시킴
            
            if ('하루에 20개까지 글을 등록할 수 있어요. 내일 다시 글을 작성해보세요.' in xml_content or
                '일주일에 70개까지 글을 등록할 수 있어요. 다음 주에 다시 글을 작성해보세요.' in xml_content) :         #resource-id="com.towneers.www:id/snackbar_text"
                self.error_message(f"(업로드를 전부 소진하엿습니다)")                 
                raise UploadLimitError(f"{self.floor}층 {self.phone}번폰 clone{self.app} 상품업로드20 당일 제한")
            
            if  '사유로 이용이 제한되었습니다.' in xml_content:
                self.error_message(f"(이용이 제한되었습니다.)")
                raise UploadLimitError(f"{self.floor}층 {self.phone}번폰 clone{self.app} 기간 제한 ")   
                         
            if re.match(r'clone\d+', text):
                key = int(re.search(r'clone(\d+)', text).group(1))  # key 값 저장                                
                matches_ = re.findall(r'clone(\d+)', xml_content)
                numbers = sorted(map(int, matches_))
                if numbers:
                    min_num, max_num = numbers[0], numbers[-1]
                    if key > max_num:
                        direction ='down'
                    elif key < min_num:
                        direction = 'up'  


            #다중계정 관련  별점
            if 'com.excelliance.multiaccounts:id/iv_star5' in xml_content:
                self.click_text_on_device('com.excelliance.multiaccounts:id/iv_star5')
                self.click_text_on_device('지금 평가')
                time.sleep(1)
                return False

            #ipmonster 경우 로딩처리
            re_text = ''
            if text=="랜덤접속" :
                if ('<node index="1" text="" resource-id="" class="android.widget.ImageView" package="kr.co.appsolution.ipmonster" content-desc="" checkable="false" checked="false" clickable="false" enabled="true" focusable="false" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="[0,0][1080,2178]" />'
                   in xml_content and
                    wait > 0): #로딩중이라면
                    print('ipmonster 로딩중입니다')
                    continue
                #ipmonster 로그인 처리
                elif '아이디/비밀번호찾기' in xml_content and 'text="" resource-id="" class="android.widget.EditText"' in xml_content:
                    if self.click_text_on_device('text="" resource-id="" class="android.widget.EditText"',1,0,0,0,0,1) : #첫번재
                        self.send_text(ipmonsterId) 
                    if self.click_text_on_device('text="" resource-id="" class="android.widget.EditText"',1,0,0,0,0,2) : #두번재  
                        self.send_text(ipmonsterPw)  
                    self.click_text_on_device('content-desc="로그인"')
                    time.sleep(1)
                    continue
                else:
                    
                    pattern = re.escape("유동&#10;") + r"(.+?)" + re.escape("&#10;") + r"\d{4}-\d{2}-\d{2}" + re.escape("&#10;")                     
                    match =re.search(pattern,xml_content)
                    if match:
                        print(f'{match}')
                        extracted_value = match.group(1)
                        print(f'{extracted_value}')
                        if '서버' in extracted_value:#서버접속목록생긴경우#text=="유동&#10;그리폰서버&#10;"
                            self.click_text_on_device(f"유동&#10;{extracted_value}&#10;",0,0,0,860)
                            pass                        
                        else:
                            #랜덤접속 화면에서 ip가 잇는 정상 경우다
                            pass

            if text =="판매중 갯수":               
                re_text = re.compile(r'"판매중 (\d+)"')                

            if isinstance(re_text, re.Pattern):#text를 정규식으로 잘라야하는 경우가 생길대 처리 
                print(f'패턴 커스텀 {re_text} text 타입: {type(re_text)} 대기: {wait}')
                if text =="판매중 갯수":
                    matches = re.findall(re_text, xml_content) 
                else:
                    matches = [m.start() for m in re_text.finditer(xml_content)]
               
            else:            
                start_idx = 0
                while True:
                    start_idx = xml_content.find(text, start_idx)
                    if start_idx == -1:
                        break
                    matches.append(start_idx)
                    start_idx += len(text)                                          
           
            if matches:
                #print(matches)
                break  # 텍스트가 발견되면 반복 종료    

                       
            if direction :                                
                self.scroll_percent(48,direction)

            #time.sleep(1)  # 1초 대기 후 다시 확인
            
        #print(f'최종 {matches}')
        return matches , xml_content

    def click_text_on_device(self,text,multi=0,wait=0,duration=0,set_x=0,set_y=0,choose=0,noclick=0):
        print(f'{text} 클릭 준비')
        #if not self.process_input(): print(f'{text} 실행 통과'); return  # p 입력 시 실행 건너뜀
        #if text == "임시저장": print("임시저장 텍스트가 입력되어 건너뜁니다."); return  #테스트하는동안만 실제는 주석처리

        matches , xml_content =  self.wait_text_xml(text,wait)
        if matches:
            print(f"텍스트 '{text}'를 {len(matches)}번 찾음. 멀티조건은 {multi}  , 선택클릭은 {choose}. , 클릭안할조건은 {noclick}")
            if noclick > 0:
                print(f'클릭안하고 확인만')
                return True
            
            if choose == -1:  # 마지막 항목 선택
                matches = [matches[-1]]
            elif  choose > 0 and choose < len(matches):
                matches = [matches[choose-1]]  # 선택된 인덱스의 항목만 선택
                #print(f'선택클릭의 조건의 매치 : {matches}')
            else:
                matches = matches if multi > 0 else [matches[0]]  # multi가 0보다 크면 모두, 아니면 첫 번째 항목만

            for match_idx in matches:                                                
                start_idx = xml_content.find(text)
                if start_idx != -1:
                    bounds_start = xml_content.find('bounds', match_idx)
                    bounds_end = xml_content.find('/>', bounds_start)
                    bounds = xml_content[bounds_start + 8:bounds_end]  # bounds="x1,y1][x2,y2"
                    bounds_pattern =r'\[(\d+),(\d+)\]\[(\d+),(\d+)\]'
                    match = re.search(bounds_pattern, bounds)
                    if match:
                        x1, y1, x2, y2 = map(int, match.groups())  # 그룹에서 추출한 값을 정수로 변환
                        x = (x1 + x2) // 2  # x1과 x2의 중간값
                        y = (y1 + y2) // 2  # y1과 y2의 중간값                            
                        if set_x > 0:
                            x = set_x
                        if set_y > 0 :
                            y = set_y


                        #######################
                        if text == '여러 물건 팔기'  :  #몇몇 기종에서의 에러가 잇다. (랜더링 종류가 다름)  
                            #R3CN20GZXYX 이 기종에서만                          
                            x = 515 
                            y = 1566                                            
                            pass

                        print(f"{text} 의 좌표 ({x}, {y}) 클릭 {bounds} {duration}")                        
                        if duration > 0 :
                            subprocess.run(['adb', '-s', self.device_id, 'shell', 'input', 'touchscreen','swipe', str(x), str(y), str(x), str(y),str(duration)], check=True)                        
                        else:
                            if text == '임시저장':
                                wait_time = random.randint(30, 240)
                                print(f"{wait_time}초 후에 '임시저장'을 클릭합니다.")
                                self.log_message(f'{self.phone}폰 clone{self.app}앱 {wait_time}초 대기 후 임시저장')
                                time.sleep(wait_time)  # 랜덤한 시간 동안 대기

                            subprocess.run(['adb', '-s', self.device_id, 'shell', 'input', 'tap', str(x), str(y)], check=True)

          
            return True
        else:
            self.error_message(f"텍스트 '{text}'를 찾을 수 없습니다.")
            return False

        
    def raise_if_click_failed(self, text, retry=0):
        """ click_text_on_device가 False일 때 예외를 발생시키는 함수 """
        if not (result := self.click_text_on_device(text)): 
            self.error_message(f"Failed to click on device with text: {text}")
            if retry>0:
                raise ClickFailureErrorRetry(f"Failed to click on device with text , saver retry: {text}")
            else:
                raise ClickFailureError(f"Failed to click on device with text: {text}")
        
    def scroll_percent(self, scroll_percentage, scroll_direction='down',fast = 400):
       
        screen_height = 2180 * 0.9  # 화면 해상도 의 90#범위에서
        x = 540  # 중앙 x좌표
        init_y = 1090        

        scroll_distance = int(screen_height * (scroll_percentage / 100))         
        # 시작 y는 아래에서 10% 위, 끝 y는 scroll_percentage 비율로 계산        
        if scroll_direction == 'down':
            start_y = init_y + int(scroll_distance / 2)
            end_y =  init_y - int(scroll_distance / 2) # 주어진 비율에 맞게 끝 y 계산 (스크롤 다운)
        elif scroll_direction == 'up':
            start_y = init_y - int(scroll_distance / 2)  # 아래에서 10% 위
            end_y =  init_y + int(scroll_distance / 2) # 주어진 비율에 맞게 끝 y 계산 (스크롤 업)
        else:
            print("잘못된 방향입니다. 'down' 또는 'up'을 입력하세요.")
            return
                      
        command = ['adb', '-s', self.device_id, 'shell', 'input', 'swipe', 
            str(x), str(start_y), str(x), str(end_y), str(fast)]      
        #print(f"스크롤 시작 좌표: {start_y} 끝 좌표: {end_y} 방향 {scroll_direction} \n ")
        #print(" ".join(command))
        subprocess.run(command, check=True)

    def start(self):
        self.close_background_app()
        self.adbkeyboard_on()
        failed_attempts = [] #에러발생 저장
        for app in self.cloneapps:            
            self.app = app
            self.log_message(f'{self.phone} clone{self.app} 1차 시작')
            try:                
                self.process_clone_app()
                continue
            except AuthenticationError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                failed_attempts.append((self.phone, self.app))

            except LoadingTimeoutError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                failed_attempts.append((self.phone, self.app))
                self.close_background_app()
                continue
                
            except UploadLimitError as e:
                print(f"업로드 제한 에러 발생: {e}")
                # 업로드 제한 에러 처리 로직 추가 (예: 로그 기록, 알림 등)      
                continue
            except ClickFailureError as e:
                print(f"클릭 실패 에러 발생: {e}")                
                continue   
            except ClickFailureErrorRetry as e:
                print(f"클릭 실패 에러 발생 재시작 저장: {e}")
                failed_attempts.append((self.phone, self.app))                
                continue                
                            
            except Exception as e:
                self.error_message(f"에러 발생: {e}, phone: {self.phone}, app: {self.app}")   
                failed_attempts.append((self.phone, self.app))
                continue
            self.log_message(f'{self.phone} clone{self.app} 1차 완료')

        #self.process_input('어레발생 앱 재실행 이동')    

        if failed_attempts:
            self.retry = 1            
            self.retry_failed_attempts(failed_attempts)
        self.log_message(f'{self.phone}폰 임시저장 프로그램 종료')
        self.gboard_on()

    def delete(self):
        self.close_background_app()       
        failed_attempts = [] #에러발생 저장
        for app in self.cloneapps:            
            self.app = app
            self.log_message(f'{self.phone} clone{self.app} 삭제 시작')
            try:
                #if not self.process_input(f'clone{app} 통과할까요?'): print(f'clone{app} 앱 실행 통과'); continue  # p 입력 시 실행 건너뜀
                self.ipmonster()              
                self.do_clone_app() 
                self.delete_items()
                continue
            except AuthenticationError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                failed_attempts.append((self.phone, self.app))

            except LoadingTimeoutError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                failed_attempts.append((self.phone, self.app))
                self.close_background_app()
                continue
                
            except UploadLimitError as e:
                print(f"삭제프로그램 업로드 제한 에러 발생: {e}")
                # 업로드 제한 에러 처리 로직 추가 (예: 로그 기록, 알림 등)      
                continue
            except ClickFailureError as e:
                print(f"삭제프로그램 클릭 실패 에러 발생: {e}")
                #if not self.process_input(f'clone{app} 에러발생부분 확인 통과할까요?'): print(f'clone{app} 에러발생 다음앱실행'); continue  # p 입력 시 실행 건너뜀
                continue   
            except ClickFailureErrorRetry as e:
                print(f"삭제프로그램 클릭 실패 에러 발생 재시작 저장: {e}")
                failed_attempts.append((self.phone, self.app))
                #if not self.process_input(f'clone{app} 에러발생부분 확인 통과할까요?'): print(f'clone{app} 에러발생 다음앱실행'); continue  # p 입력 시 실행 건너뜀
                continue                
                            
            except Exception as e:
                self.error_message(f"에러 발생: {e}, phone: {self.phone}, app: {self.app}")  
                failed_attempts.append((self.phone, self.app)) 
                continue
            self.log_message(f'{self.phone} clone{self.app} 1차 완료')

        if failed_attempts:
            self.retry = 1            
            self.retry_failed_delete_attempts(failed_attempts)
        self.log_message(f'{self.phone}폰 삭제 프로그램 종료')
        self.gboard_on()


    def save(self):
        self.close_background_app()        
        failed_attempts = [] #에러발생 저장
        for app in self.cloneapps:                           
            self.app = app
            self.log_message(f'{self.phone} clone{self.app} 저장  시작')
            try:                
                ###########
                #######   작성완료 앱 과 앱사이 시작간격 , 
                wait_time = random.randint(120, 360)
                print(f"{wait_time}초 후에 {self.phone}폰 clone{self.app}앱 작성완료 시작.")                
                self.log_message(f'{self.phone}폰 clone{self.app}앱 {wait_time}초 대기 후 작성완료 시작')
                #wait_time = 5
                time.sleep(wait_time)  # 랜덤한 시간 동안 대기                    
                self.ipmonster()              
                self.do_clone_app() 
                self.save_info()
                continue
            except AuthenticationError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                failed_attempts.append((self.phone, self.app))

            except LoadingTimeoutError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                failed_attempts.append((self.phone, self.app))
                self.close_background_app()
                continue
                
            except UploadLimitError as e:
                print(f"업로드 제한 에러 발생: {e}")
                # 업로드 제한 에러 처리 로직 추가 (예: 로그 기록, 알림 등)      
                continue
            except ClickFailureError as e:
                print(f"작성작업 : 클릭 실패 에러 발생: {e}")                
                continue   
            except ClickFailureErrorRetry as e:
                print(f"작성작업 : 클릭 실패 에러 발생 재시작 저장: {e}")
                failed_attempts.append((self.phone, self.app))               
                continue                
                            
            except Exception as e:
                self.error_message(f"작성작업 에러 발생: {e}, phone: {self.phone}, app: {self.app}")   
                failed_attempts.append((self.phone, self.app))
                continue
            self.log_message(f'{self.phone} clone{self.app} 1차 완료')

        #self.process_input('어레발생 앱 재실행 이동')    
        print(f'{failed_attempts}')
        
        if failed_attempts:
            self.save_result_message(failed_attempts)

        self.log_message(f'{self.phone}폰 업로드 프로그램 종료')

  
    def process_clone_app(self):                
        self.ipmonster()              
        self.do_clone_app() 
        self.write_multi()
        jpg_dict_list = self.sorted_jpg_dict()   
        total_items = len(jpg_dict_list) 
        for index, (item, _) in enumerate(jpg_dict_list):                
            target_index = index + 1
            print(f"index: {index}, item: {item},  {target_index} 번 상품")
            self.jpg_dict = jpg_dict_list[index]
            self.upload_file()
            scroll_value = 49 if target_index == 1 else 44
                
            if total_items == target_index:
                
                #self.click_text_on_device("선호하는 거래 방식이나 자주 쓰는 문구를 한 번에 작성해보세요.") 
                #self.process_input('현재 상품 확인위해서 멈추기')  
                #self.click_text_on_device("com.towneers.www:id/ivClose")#x버튼 누르기
                #self.click_text_on_device("저장 안 함") 
                self.scroll_percent(40) 
                time.sleep(1)                    
                self.click_text_on_device("com.towneers.www:id/etArticleSharedContent")         #선호하는 거래 방식이나 자주 쓰는 문구를 한 번에 작성해보세요.
                random_line  = self.get_random_line()
                self.send_text(random_line)             
                #self.process_input('현재 상품 확인위해서 멈추기')     
                if self.click_text_on_device("임시저장"):                    
                    self.click_text_on_device("com.towneers.www:id/ivClose")#x버튼 누르기
                    if self.click_text_on_device("com.towneers.www:id/home",0,5): #com.towneers.www:id/home #홈
                        return True


            else:      
                            
                self.click_text_on_device("물품 추가")         
                self.scroll_percent(int(scroll_value))

                   

    def retry_failed_attempts(self, failed_attempts):
        
        last_failed_attempts = []
        self.close_background_app()
        self.log_message(f"\n \n 재시도 목록 {str(failed_attempts)} \n \n")
        for phone, app in failed_attempts:
            self.phone = phone
            self.app = app
            print(f"재시도: {self.floor}층 {self.phone}폰 clone{self.app} 앱")
            self.log_message(f'재시도: {self.floor}층 {self.phone} clone{self.app} 시작')  
            try:
                self.process_clone_app()
            except AuthenticationError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                last_failed_attempts .append((self.phone, self.app))
                continue
            except LoadingTimeoutError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                last_failed_attempts.append((self.phone, self.app))
                self.close_background_app()
                continue            
            except UploadLimitError as e:
                print(f"업로드 제한 에러 발생: {e}")
                last_failed_attempts .append((self.phone, self.app))
                # 업로드 제한 에러 처리 로직 추가 (예: 로그 기록, 알림 등)      
                continue
            
            except ClickFailureError as e:
                print(f"클릭 실패 에러 발생: {e}")
                last_failed_attempts .append((self.phone, self.app))
                #if not self.process_input(f'clone{app} 에러발생부분 확인 통과할까요?'): print(f'clone{app} 에러발생 다음앱실행'); continue  # p 입력 시 실행 건너뜀
                continue        

            except ClickFailureErrorRetry as e:
                print(f"클릭 실패 에러 발생 재시작 저장: {e}")
                last_failed_attempts.append((self.phone, self.app))
                #if not self.process_input(f'clone{app} 에러발생부분 확인 통과할까요?'): print(f'clone{app} 에러발생 다음앱실행'); continue  # p 입력 시 실행 건너뜀
                continue   

            except Exception as e:
                last_failed_attempts .append((self.phone, self.app))
                self.error_message(f"재시도 중 에러 발생: {e}, phone: {self.phone}, app: {self.app}")
                continue
                
            self.log_message(f'재시도: {self.floor}층 {self.phone} clone{self.app} 성공')  

        self.error_message(f"\n \n 최종 에러 목록 {str(last_failed_attempts)}")      


    def retry_failed_delete_attempts(self,failed_attempts):
        last_failed_attempts = []
        self.close_background_app()       
        failed_attempts = [] #에러발생 저장
        self.log_message(f"\n \n 삭제프로그램 재시도 목록 {str(failed_attempts)} \n \n")
        for app in self.cloneapps:            
            self.app = app
            self.log_message(f'{self.phone} clone{self.app} 삭제 시작')
            try:
                #if not self.process_input(f'clone{app} 통과할까요?'): print(f'clone{app} 앱 실행 통과'); continue  # p 입력 시 실행 건너뜀
                self.ipmonster()              
                self.do_clone_app() 
                self.delete_items()
                continue
            except AuthenticationError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                last_failed_attempts.append((self.phone, self.app))

            except LoadingTimeoutError as e:                                
                # 인증 에러 처리 로직 추가 (예: 로그 기록, 알림 등)
                last_failed_attempts.append((self.phone, self.app))
                self.close_background_app()
                continue
                
            except UploadLimitError as e:
                print(f"삭제프로그램 업로드 제한 에러 발생: {e}")
                # 업로드 제한 에러 처리 로직 추가 (예: 로그 기록, 알림 등)      
                continue
            except ClickFailureError as e:
                print(f"삭제프로그램 클릭 실패 에러 발생: {e}")
                #if not self.process_input(f'clone{app} 에러발생부분 확인 통과할까요?'): print(f'clone{app} 에러발생 다음앱실행'); continue  # p 입력 시 실행 건너뜀
                continue   
            except ClickFailureErrorRetry as e:
                print(f"삭제프로그램 클릭 실패 에러 발생 재시작 저장: {e}")
                last_failed_attempts.append((self.phone, self.app))
                #if not self.process_input(f'clone{app} 에러발생부분 확인 통과할까요?'): print(f'clone{app} 에러발생 다음앱실행'); continue  # p 입력 시 실행 건너뜀
                continue                
                            
            except Exception as e:
                self.error_message(f"삭제프로그램 에러 발생: {e}, phone: {self.phone}, app: {self.app}")   
                continue
            self.log_message(f'{self.phone} clone{self.app} 삭제프로그램 재시도 완료')

        self.error_message(f"\n \n 삭제프로그램 최종 에러 목록 {str(last_failed_attempts)}")      


    def ipmonster(self):
        if self.ipmonster_start():
            return True
        else:
            self.ipmonster_stop()
            if self.ipmonster_start():
                return True


    def ipmonster_start(self):
        self.press_home_button()
        self.click_text_on_device("ipmonster",0)  # 텍스트 클릭
        
        #self.click_text_on_device("유동&#10;그리폰서버&#10;",0,6,0,860) #서버이름은 변동될수잇으니 정규식으로 ...                
        if self.click_text_on_device("랜덤접속",0,6):
            return True
        
        
    def ipmonster_stop(self):            
        self.press_home_button()
        self.click_text_on_device("ipmonster",0,0,500)            
        self.click_text_on_device("com.sec.android.app.launcher:id/quickoption_side_button")
        self.click_text_on_device("강제 중지")
        self.click_text_on_device("확인")    
            



    def do_clone_app(self):
        if self.do_clone_app_start():
            return True
        else:
            self.close_background_app()
            self.do_clone_app_stop()
            if self.do_clone_app_start():
                return True
            else:
                self.error_message(f"로딩 에러 발생:  phone: {self.phone}, clone{self.app} 앱 로딩 실패로 에러처리 ") 
                raise LoadingTimeoutError(f"clone{self.app} 앱 로딩 실패로 에러처리 ")            

    def do_clone_app_start(self):                
        self.press_home_button()       
        self.click_text_on_device("다중 계정")  # 텍스트 클릭   
        if self.work_type == "start":
            wait_time = random.randint(1, 60)
            print(f"{wait_time}초 후에 {self.phone}폰 clone{self.app}앱을 클릭합니다.")
            self.log_message(f'{self.phone}폰 clone{self.app}앱 {wait_time}초 대기 후 앱시작 클릭')
            time.sleep(wait_time)  # 랜덤한 시간 동안 대기
        #지금 평가 #com.excelliance.multiaccounts:id/iv_star5
        if self.click_text_on_device(f'clone{self.app}"',0,8):
            if self.click_text_on_device("com.towneers.www:id/home",0,5): #com.towneers.www:id/home #홈
                return True
              
    def do_clone_app_stop(self):
        self.press_home_button()
        self.click_text_on_device("다중 계정",0,0,500)            
        self.click_text_on_device("com.sec.android.app.launcher:id/quickoption_side_button")
        self.click_text_on_device("강제 중지")
        self.click_text_on_device("확인")            
        
    def write_multi(self):        
        self.raise_if_click_failed("글쓰기")  # 텍스트 클릭        
        self.raise_if_click_failed('여러 물건 팔기')  # 텍스트 클릭        
        if self.retry > 0 :
            self.click_text_on_device("새로 쓰기")  # 텍스트 클릭 


    def save_info(self):     
        #작성 완료는 전부다 에러 발생시 에러로그로 튕겨나가게 한다. (작성완료는 재실행 작동안하고 1회만 검사)    
        self.raise_if_click_failed("글쓰기",1)  # 텍스트 클릭        
        self.raise_if_click_failed('여러 물건 팔기',1)  # 텍스트 클릭        
        self.raise_if_click_failed("이어서 쓰기",1)  # 텍스트 클릭 
        time.sleep(8)
        #if not self.process_input(f'clone{self.app} 이어서 쓰기 에러발생부분 확인 통과할까요?'): print(f'clone{self.app} 에러발생 다음앱실행')  # p 입력 시 실행 건너뜀
        self.raise_if_click_failed("com.towneers.www:id/btnPost",1) #작성 완료
        time.sleep(5)
        

    def get_xml_element(self, text, wait):
        matches , xml_content =  self.wait_text_xml(text,wait)
        if matches:
            return matches[0]
        
    def delete_items(self):
        self.raise_if_click_failed("나의 당근")
        self.scroll_percent(42)
        self.raise_if_click_failed("판매내역")        
        total_number = self.get_xml_element("판매중 갯수", 2)
        
        if total_number:
            print(f'판매중 갯수 : {total_number}, 타입: {type(total_number)}')
            #계산식
            total_number = int(total_number)
            scroll_number = int(total_number/12 + 1)
            print(f'스크롤 숫자 : {scroll_number}')
            for _ in range(scroll_number):
                self.scroll_percent(48,'down',50)
                time.sleep(5)
            #time.sleep(4)
            if total_number > 30 : #최소 몇개 이상
                min_delete_count = int(max(1, int(total_number - 30) * 0.7) )
                delete_count   =   min_delete_count  
                '''
                scale = 2.4 # 값이 커질수록 max_val 근처 확률 증가 #1 기본 2가중 3 극한가중. 0에 가가울수록 0.1 작은게 더 높은 확률
                rand = random.random() ** scale  # 지수 함수 적용
                delete_count = 1 + int(rand * (min_delete_count - 1))  # 범위 조정                '
                '''
                # 변수 출력 (디버깅용)
                print(f"total_number: {total_number}  min_delete_count: {min_delete_count} delete_count: {delete_count}")  
                
                success_count = 0           
                for _ in range(delete_count): 
                    if self.click_text_on_device('com.towneers.www:id/btnOverflows',1,0,0,0,0,-1):
                       self.raise_if_click_failed('삭제',1) 
                       #self.raise_if_click_failed('com.towneers.www:id/negativeButton') #취소            
                       self.raise_if_click_failed('com.towneers.www:id/positiveButton',1) #삭제        실제삭제용
                       success_count  = success_count  + 1 
                       time.sleep(1)
                    pass                     
                self.log_message(f'총 {total_number}에서 {delete_count} 삭제 시도 . {success_count}삭제성공')
            #self.process_input('현재 상품 확인위해서 멈추기')
#python C:\adb\handle_device.py R3CN20FR13W 1 3Floor1 delete                    #       
            



    def main_image_upload(self):
        self.delete_to_image() 
        self.send_to_image(1)
        self.raise_if_click_failed("0/10")  # 텍스트 클릭
        self.click_text_on_device("com.towneers.www:id/bg_squareImageView")
        self.click_text_on_device("완료")
        
    
    def sub_images_upload(self):
        self.delete_to_image()
        self.send_to_image()
        self.raise_if_click_failed("1/10")  # 텍스트 클릭
        self.click_text_on_device("com.towneers.www:id/bg_squareImageView",1)#나머지 이미지 전부 클릭
        self.click_text_on_device("완료")     
        
                

    def upload_file(self):         
        self.main_image_upload()#메인이미지
        item_info = self.parse_item_info()#상품정보
        price = int(item_info['C']) + delivery_fee  #배송비추가      
        self.raise_if_click_failed('resource-id="com.towneers.www:id/etTitle',1)  # 텍스트 클릭       #제목     
        self.send_text(item_info['B'])              
        time.sleep(1)#상품카테고리 
        self.raise_if_click_failed("com.towneers.www:id/chipCategory",1)   #에러발생시 재시도저장
        self.sub_images_upload()    #서브이미지         
        self.raise_if_click_failed("com.towneers.www:id/etPrice",1)  # 텍스트 클릭 ₩ 가격을 입력해주세요.  #에러발생시 재시도저장
        self.send_text(price)
        self.raise_if_click_failed("게시글 내용을 작성해주세요.",1)  # 텍스트 클릭
        self.send_text(item_info['D'],' - ')         
                    

    def adbkeyboard_on(self):

        try:
            subprocess.run(
                ["adb", "-s", str(self.device_id), "shell", "ime", "set", "com.android.adbkeyboard/.AdbIME"],
                check=True,  # 명령어 실행 중 오류가 발생하면 예외가 발생합니다.
                stdout=subprocess.PIPE,  # 표준 출력을 캡처할 수 있게 설정
                stderr=subprocess.PIPE   # 표준 오류 출력을 캡처할 수 있게 설정
            )
                    

        except Exception as e: # subprocess.CalledProcessError 
            self.error_message(f"{self.device_id} 장치에서 adbkeyboard 설정 중 오류가 발생했습니다: \n {e}")
            print(f"{self.device_id} 장치에서 adbkeyboard 설정 중 오류가 발생했습니다: \n {e}")      #.stderr.decode()

    def gboard_on(self):
        try:
            # Gboard를 기본 키보드로 설정
            subprocess.run(
                ["adb", "-s", str(self.device_id), "shell", "ime", "set",
                "com.google.android.inputmethod.latin/com.android.inputmethod.latin.LatinIME"],
                check=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            print(f"{self.device_id} 장치의 기본 키보드가 Gboard로 변경되었습니다.")

        except Exception as e:
            self.error_message(f"{self.device_id} 장치에서 Gboard 설정 중 오류가 발생했습니다: \n {e}")
            print(f"{self.device_id} 장치에서 Gboard 설정 중 오류가 발생했습니다: \n {e}")

    def send_text(self,text,opt=''):    
        text = str(text)
        processed_text =  text.lstrip(" -")    
        text = processed_text
        if opt:
            text = processed_text + str(opt)
        else:
            text = processed_text
        if not text:
            return     
        os.system("adb -s "+str(self.device_id)+" shell am broadcast -a ADB_INPUT_TEXT --es msg '"+text+"' > nul")     


    def process_input(self,msg=''):
        user_input = input(f"{msg}  입력하세요 (x : 종료, y : 진행, p : 통과, r : 강제에러): ")    
        if user_input == 'x':
            self.gboard_on()
            print("프로그램을 종료합니다.")
            sys.exit()  # 프로그램 종료
        elif user_input == 'y':
            print("프로그램을 진행합니다.")
            return True
            # 진행할 코드 넣기
        elif user_input == 'p':
            print(f"{msg} 실행을 건너뜁니다.")
            return  # 실행을 통과시키고 함수 종료
        elif user_input == 'r':
            print(f"{msg}   강제 에러 발생 .")
            raise RuntimeError("강제 에러 발생")  # 예외 발생    
        else:
            print("잘못된 입력입니다. x, y 또는 p를 입력해주세요.")
            self.process_input()  # 재귀 호출하여 다시 입력 받기    

   
def handle_device(serial_number, device_info , floor, work_info):    
    '''
    wait_time = random.randint(5, 15)
    print(f"{wait_time}초 후에 {work_info}종료 합니다.") 
    time.sleep(wait_time)  # 랜덤한 시간 동안 대기
    sys.exit()'
    '''
    # CloneUpload 객체 초기화 시 serial_number와 device_info를 전달
    clone_upload = CloneUpload(serial_number, device_info , floor,work_info)      
    if work_info == "start":
        clone_upload.start()
    elif work_info == "delete":
        clone_upload.delete()
    elif work_info == "save":
        clone_upload.save()
   


if __name__ == "__main__":  
    
    
     
    serial_number = sys.argv[1]
    device_info = sys.argv[2:-2]  # device_info는 여러 인자가 되어있음
    floor_info = sys.argv[-2]
    work_info = sys.argv[-1]
    os.system(f'title 당근폰 제어 {floor_info}')

    print(f"Serial Number: {serial_number}")
    print(f"Device Info: {device_info}")
    print(f"Floor Info: {floor_info}")


    handle_device(serial_number, device_info, floor_info,work_info)

 

반응형
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함